[SERVER-57345] Retryable write pre-fetch missing documents Created: 01/Jun/21  Updated: 29/Oct/23  Resolved: 08/Jun/21

Status: Closed
Project: Core Server
Component/s: Replication
Affects Version/s: None
Fix Version/s: 5.1.0-rc0

Type: Bug Priority: Major - P3
Reporter: Wenbin Zhu Assignee: Wenbin Zhu
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
related to SERVER-56631 Retryable write pre-fetch phase could... Closed
Backwards Compatibility: Fully Compatible
Operating System: ALL
Sprint: Repl 2021-06-14
Participants:

 Description   

During the investigation of SERVER-56631, another issue with retryable write pre-fetch popped up. The issue happens when we pause after recipient connects to the donor secondary as the sync source, then perform retryable writes on donor primary and wait for the sync source secondaries to replicate and advance committed snapshot to the last retryable write statement. Unlike SERVER-56631 where the committed snapshot is not at the secondary's batch boundary, this time the committed snapshot is at the boundary and is the last retryable write statement. Then we continue migration and pre-fetch retryable write statements. However the pre-fetch result is randomly missing some documents in the middle. As a result, after tenant migration completes and retries the previous retryable writes on recipient, we expect none of these statements are executed, but due to the missing documents during pre-fetch, these missing statements are re-executed, which is not correct. However if we advance the timestamp by writing something else after the retryable write on the donor primary (basically advance the committed snapshot after the last retryable write statement), then the pre-fetched result is correct. We suspect there is a problem in the aggregation pipeline we use to pre-fetch retryable writes.

 

Here is the test script to repro this issue:

/**
 * @tags: [
 *   requires_fcv_50,
 *   requires_majority_read_concern,
 *   incompatible_with_eft,
 *   incompatible_with_windows_tls,
 *   incompatible_with_macos, requires_persistence
 * ]
 */
 
 (function() {
    load("jstests/replsets/libs/tenant_migration_test.js");
    load("jstests/replsets/libs/tenant_migration_util.js");
    load("jstests/libs/fail_point_util.js"); // For configureFailPoint().
    load("jstests/libs/uuid_util.js");       // For extractUUIDFromObject().
    load("jstests/libs/write_concern_util.js");
    
    const getRecipientCurrOp = function(conn, migrationId) {
        const res = conn.adminCommand({currentOp: true, desc: "tenant recipient migration"});
        assert.eq(res.inprog.length, 1);
        const currOp = res.inprog[0];
        assert.eq(bsonWoCompare(currOp.instanceID, migrationId), 0);
    
        return currOp;
    };
    
    const getDonorSyncSource = function(conn, migrationId) {
        const currOp = getRecipientCurrOp(conn, migrationId);
        return currOp.donorSyncSource;
    };
    
    const getStartFetchingDonorOpTime = function(conn, migrationId) {
        const currOp = getRecipientCurrOp(conn, migrationId);
        return currOp.startFetchingDonorOpTime;
    }
    
    const oplogApplierBatchSize = 50;
    
    const donorRst = new ReplSetTest({
        nodes: 3,
        // Force secondaries to sync from the primary to guarantee replication progress with the
        // stopReplProducerOnDocument failpoint. Also disable primary catchup because some
        // replicated retryable write statements are intentionally not being made majority
        // committed.
        settings: {chainingAllowed: false, catchUpTimeoutMillis: 0},
        nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, {
            setParameter: {
                tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000,
                // Allow non-timestamped reads on donor after migration completes for testing.
                'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
            }
        }),
    });
    donorRst.startSet();
    donorRst.initiateWithHighElectionTimeout();
    const donorPrimary = donorRst.getPrimary();
    
    if (!TenantMigrationUtil.isFeatureFlagEnabled(donorPrimary)) {
        jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
        donorRst.stopSet();
        return;
    }
    
    const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst: donorRst});
    
    const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
    const kTenantId = "testTenantId";
    const migrationId = UUID();
    const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB");
    const kCollName = "retryable_write_secondary_oplog_application";
    const kNs = `${kDbName}.${kCollName}`;
    
    const migrationOpts = {
        migrationIdString: extractUUIDFromObject(migrationId),
        tenantId: kTenantId,
        // Use secondary as sync source.
        readPreference: {mode: 'secondary'},
    };
    
    const fpAfterConnectingTenantMigrationRecipientInstance = configureFailPoint(
        recipientPrimary,
        "fpAfterConnectingTenantMigrationRecipientInstance",
        {action: "hang"});
    
    // Start tenant migration and hang after recipient connects to donor sync source.
    jsTestLog("Starting tenant migration.");
    assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
    fpAfterConnectingTenantMigrationRecipientInstance.wait();
    
    assert.commandWorked(
        donorPrimary.getCollection(kNs).insert({_id: 0, counter: 0}, {writeConcern: {w: 3}}));
    donorRst.awaitReplication();
    
    const counterTotal = oplogApplierBatchSize;
    const counterMajorityCommitted = counterTotal - 2;
    jsTestLog(`counterTotal: ${counterTotal}, counterMajorityCommitted: ${counterMajorityCommitted}`);
    
    // Perform all the the retryable write statements on donor primary.
    const lsid = ({id: UUID()});
    assert.commandWorked(donorPrimary.getCollection(kNs).runCommand("update", {
        updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
        lsid,
        txnNumber: NumberLong(1),
    }));
 
    donorRst.awaitReplication();
    
    const stmtTotal = donorPrimary.getCollection("local.oplog.rs")
                          .findOne({"o.diff.u.counter": counterTotal});
    const stmtMajorityCommitted = donorPrimary.getCollection("local.oplog.rs")
                                      .findOne({"o.diff.u.counter": counterMajorityCommitted});
    
    assert.neq(null, stmtTotal);
    assert.neq(null, stmtMajorityCommitted);
    jsTestLog(`stmtTotal timestamp: ${tojson(stmtTotal.ts)}`);
    jsTestLog(`stmtMajorityCommitted timestamp: ${tojson(stmtMajorityCommitted.ts)}`);
 
    // Un-commenting this would make the pre-fetch result correct.
    // assert.commandWorked(
    //     donorPrimary.getCollection(kNs).insert({_id: 1, data: 0}, {writeConcern: {w: 3}}));
    // donorRst.awaitReplication();
    
    for (const s of donorRst.getSecondaries()) {
        assert.soon(() => {
            const {optimes: {appliedOpTime, durableOpTime}} =
                assert.commandWorked(s.adminCommand({replSetGetStatus: 1}));
    
            print(`${s.host}: ${tojsononeline({
                appliedOpTime,
                durableOpTime,
                stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
            })}`);
    
            return bsonWoCompare(appliedOpTime.ts, stmtTotal.ts) >= 0 &&
                bsonWoCompare(durableOpTime.ts, stmtTotal.ts) >= 0;
        });
    }
    
    for (const s of donorRst.getSecondaries()) {
        assert.soon(() => {
            const {lastStableRecoveryTimestamp} =
                assert.commandWorked(s.adminCommand({replSetGetStatus: 1}));
        
            print(`${s.host}: ${tojsononeline({
                lastStableRecoveryTimestamp,
                stmtTotalTimestamp: stmtTotal.ts
            })}`);
        
            return bsonWoCompare(lastStableRecoveryTimestamp, stmtTotal.ts) >= 0;
        });
    }
 
    const secondary1 = donorRst.getSecondaries()[0];
    const primaryTxRes = donorPrimary.getCollection("config.transactions").find().toArray();
    jsTestLog(`donor primary txRes: ${tojson(primaryTxRes)}`);
    const secondaryTxRes = secondary1.getCollection("config.transactions").find().toArray();
    jsTestLog(`donor secondary txRes: ${tojson(secondaryTxRes)}`);
 
    const secondaryViewRes = secondary1.getCollection("local.system.tenantMigration.oplogView").find().readConcern("majority").toArray();
    jsTestLog(`donor secondary secondaryViewRes: ${tojson(secondaryViewRes)}`);
    const secondaryCounterRes = secondary1.getCollection(kNs).find().readConcern("majority").toArray();
    jsTestLog(`donor secondary secondaryCounterRes: ${tojson(secondaryCounterRes)}`);
 
    const fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime = configureFailPoint(
        recipientPrimary,
        "fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime",
        {action: "hang"});
    
    fpAfterConnectingTenantMigrationRecipientInstance.off();
    fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.wait();
    
    const startFetchingDonorOpTime = getStartFetchingDonorOpTime(recipientPrimary, migrationId);
    // assert.eq(startFetchingDonorOpTime.ts, stmtMajorityCommitted.ts);
    assert.gte(startFetchingDonorOpTime.ts, stmtTotal.ts);
    
    fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.off();
    
    TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
    assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
    
    const findRes = recipientPrimary.getCollection(kNs).findOne({_id: 0});
    jsTestLog(`findRes 1: ${tojson(findRes)}`);
    
    assert.commandWorked(recipientPrimary.getCollection(kNs).runCommand("update", {
        updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
        lsid,
        txnNumber: NumberLong(1),
    }));
    
    // Expect the counter's result to be the same as the previous result, i.e. no statement is re-executed.
    const findRes2 = recipientPrimary.getCollection(kNs).findOne({_id: 0});
    jsTestLog(`findRes 2: ${tojson(findRes2)}`);
    assert.eq(findRes, findRes2);
 
    const recipientLogs = recipientPrimary.getCollection("local.oplog.rs").find().toArray();
    jsTestLog(`recipient logs: ${tojson(recipientLogs)}`);
    
    donorRst.stopSet();
    tenantMigrationTest.stop();
})();



 Comments   
Comment by Vivian Ge (Inactive) [ 06/Oct/21 ]

Updating the fixversion since branching activities occurred yesterday. This ticket will be in rc0 when it’s been triggered. For more active release information, please keep an eye on #server-release. Thank you!

Comment by Wenbin Zhu [ 08/Jun/21 ]

Fixed as part of SERVER-56631

Comment by Wenbin Zhu [ 08/Jun/21 ]

The fix will be included in SERVER-56631

Comment by Wenbin Zhu [ 08/Jun/21 ]

lingzhi.deng pavithra.vetriselvan After some investigation, I think the root cause is indeed the incorrect order of the  filtering stage and the sorting stage in one particular edge case. This happens when the startFetchingTimestamp is equal to the timestamp of the last retryable write. The sorting stage is not using $sort, it's uses $reduce to perform something similar to a bucket sort. It uses $$this.depthForTenantMigration as the index in the "history" array returned from the previous $graphLookup stage. This causes the problem because in this case, after the filtering stage, the entry with the largest timestamp (which is equal to startFetchingTimestamp) is removed, so if we originally have 5 entries, after filtering, the depthForTenantMigration of the entries becomes [1, 4] (0 is filtered out), however the actual index of the array is [0, 3]. So this index mismatch results in sort stages uses the incorrect index for its bucket sort.

As an example, say we originally have 5 entries: [E_0_4, E_1_3, E_2_2, E_3_1, E_4_0] where E_0_4 means the first entry with 0 as its timestamp, and 4 as its depthForTenantMigration. Let's take a look at how the sort stage works:
 

1  {$set: {
2      history: {$reverseArray: {$reduce: {
3          input: "$history",
4          initialValue: {$range: [0, {$size: "$history"}]},
5          in: {$concatArrays: [
6              {$slice: ["$$value", "$$this.depthForTenantMigration"]},
7              ["$$this"],
8              {$slice: [
9                  "$$value",
10                 {$subtract: [
11                     {$add: ["$$this.depthForTenantMigration", 1]},
12                     {$size: "$history"},
13                 ]},
14             ]},
15         ]},
16     ]},
17 ]}

The sort stages sorts the entries in ascending order of depthForTenantMigration. Each iteration in $reduce places the current entry into its corresponding index in the sorted array. It does this by stitching three subarrays: the sorted left subarray (line 6), the current entry (line 7) and the sorted right subarray (line 8-14). Due to the filter stage, E_4_0 got filtered out. So we have [E_0_4, E_1_3, E_2_2, E_3_1] and the $size of "history"array becomes 4. In line 4 it uses $range to create the array [0, 1, 2, 3] as the initial value, then if E_0_4 is the first entry we process in $reduce (not necessary since $graphLookup does not guarantee order), since depthForTenantMigration is 4, the left subarray is [0, 1, 2, 3], and the right subarray is [0] (the $subtract and $add are supposed to result in a value n that is <= 0, so then slice can return a subarray with the last n values in the original array, but here we get a positive number 1, so the first element in the array is incorrectly returned). So the result array after this iteration is  [0, 1, 2, 3, E_0_4, 0]. This is already incorrect, but not data loss yet. Next if we process E_1_3, the depthForTenantMigration is 3, so the left subarray is [0, 1, 2], the right subarray is an empty array (3 + 1 - 4 = 0). So the result array after this iteration is [0, 1, 2, E_1_3], and E_0_4 is lost. Since $graphLookup does not guarantee the order of the returned array, the lost entries can be random, which corresponding to the behavior in the ticket description. If we reverse the two stages, then the sorting stage can process the full array and this issue will not happen.
 

Comment by Wenbin Zhu [ 03/Jun/21 ]

During the discussion of this bug, pavithra.vetriselvan suggested trying to reverse the order of the  filtering stage and the next reduce/sorting stage, which seems to fix the bug, though we are not yet certain of the exact explanation of this, but intuitively the filtering stage should be after reduce/sorting stage. Will look further into this and confirm, and will also include the fix in SERVER-56631.

Comment by Wenbin Zhu [ 01/Jun/21 ]

lingzhi.deng pavithra.vetriselvan We might need to investigate this one.

Generated at Thu Feb 08 05:41:36 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.