diff --git a/jstests/concurrency/fsm_all_replication.js b/jstests/concurrency/fsm_all_replication.js index e8ee14fb51..07a30c8bb2 100644 --- a/jstests/concurrency/fsm_all_replication.js +++ b/jstests/concurrency/fsm_all_replication.js @@ -13,7 +13,13 @@ var blacklist = [ return dir + '/' + file; }); +var whitelist = [ + 'update_and_bulk_insert.js', +].map(function(file) { + return dir + '/' + file; +}); + runWorkloadsSerially(ls(dir).filter(function(file) { - return !Array.contains(blacklist, file); + return Array.contains(whitelist, file); }), {replication: {enabled: true}}); diff --git a/jstests/concurrency/fsm_libs/cluster.js b/jstests/concurrency/fsm_libs/cluster.js index 287ea8b428..0b4d3a42df 100644 --- a/jstests/concurrency/fsm_libs/cluster.js +++ b/jstests/concurrency/fsm_libs/cluster.js @@ -71,7 +71,7 @@ var Cluster = function(options) { "Must have replication.enabled be true if 'replication.numNodes' is specified"); } - options.replication.numNodes = options.replication.numNodes || 3; + options.replication.numNodes = options.replication.numNodes || 1; assert.eq('number', typeof options.replication.numNodes); options.sameCollection = options.sameCollection || false; @@ -296,9 +296,19 @@ var Cluster = function(options) { } else if (options.replication.enabled) { var replSetConfig = { nodes: makeReplSetTestConfig(options.replication.numNodes), - // Increase the oplog size (in MB) to prevent rollover during write-heavy workloads - oplogSize: 1024, - nodeOptions: {verbose: verbosityLevel} + // Use a tiny oplog to increase the rate of oplog truncation. + oplogSize: 10, + nodes: { + n0: { + // Using a small cache size on the secondaries appeared to significantly + // slow down oplog application. + wiredTigerEngineConfigString: "cache_size=10MB", + }, + }, + nodeOptions: { + verbose: verbosityLevel, + setParameter: {logComponentVerbosity: tojson({})}, + }, }; var rst = new ReplSetTest(replSetConfig); diff --git a/jstests/concurrency/fsm_workloads/update_and_bulk_insert.js b/jstests/concurrency/fsm_workloads/update_and_bulk_insert.js index dde8b4b709..25d7e3ba2c 100644 --- a/jstests/concurrency/fsm_workloads/update_and_bulk_insert.js +++ b/jstests/concurrency/fsm_workloads/update_and_bulk_insert.js @@ -12,17 +12,15 @@ */ var $config = (function() { + var largeStr = "x".repeat(10 * 1024); + var states = { insert: function insert(db, collName) { - var bulk = db[collName].initializeUnorderedBulkOp(); - for (var i = 0; i < 100; ++i) { - bulk.insert({}); - } - assert.writeOK(bulk.execute()); + assert.writeOK(db[collName].insert({_id: this.tid})); }, update: function update(db, collName) { - var res = db[collName].update({}, {$inc: {n: 1}}, {multi: true}); + var res = db[collName].update({_id: this.tid}, {$inc: {n: 1}, $set: {str: largeStr}}); assertAlways.lte(0, res.nMatched, tojson(res)); if (db.getMongo().writeMode() === 'commands') { assertAlways.eq(res.nMatched, res.nModified, tojson(res)); @@ -31,11 +29,11 @@ var $config = (function() { } }; - var transitions = {insert: {insert: 0.2, update: 0.8}, update: {insert: 0.2, update: 0.8}}; + var transitions = {insert: {update: 1}, update: {update: 1}}; return { - threadCount: 5, - iterations: 50, + threadCount: 500, + iterations: 1000, startState: 'insert', states: states, transitions: transitions diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index ac302cb492..0da0f65685 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -222,7 +222,8 @@ void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRec return; } - LOG(2) << "create new oplogStone, current stones:" << _stones.size(); + LOG(0) << "create new oplogStone, current stones:" << _stones.size() + << " last record: " << lastRecord; OplogStones::Stone stone = {_currentRecords.swap(0), _currentBytes.swap(0), lastRecord}; _stones.push_back(stone); @@ -322,7 +323,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationCon _currentRecords.addAndFetch(1); int64_t newCurrentBytes = _currentBytes.addAndFetch(record->data.size()); if (newCurrentBytes >= _minBytesPerStone) { - LOG(1) << "Placing a marker at optime " + LOG(0) << "Placing a marker at optime " << Timestamp(record->id.repr()).toStringPretty(); OplogStones::Stone stone = {_currentRecords.swap(0), _currentBytes.swap(0), record->id}; @@ -1051,11 +1052,29 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx) { while (auto stone = _oplogStones->peekOldestStoneIfNeeded()) { invariant(stone->lastRecord.isNormal()); - LOG(1) << "Truncating the oplog between " << _oplogStones->firstRecord << " and " + LOG(0) << "Truncating the oplog between " << _oplogStones->firstRecord << " and " << stone->lastRecord << " to remove approximately " << stone->records << " records totaling to " << stone->bytes << " bytes"; WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(opCtx); + { + // The first record in the oplog should be within the truncate range. + auto validatingSession = ru->getSessionCache()->getSession(); + WT_CURSOR* cursor = validatingSession->getCursor(_uri, _tableId, true); + ON_BLOCK_EXIT([&] { validatingSession->releaseCursor(_tableId, cursor); }); + int ret = WT_READ_CHECK(cursor->next(cursor)); + invariantWTOK(ret); + RecordId firstRecord = getKey(cursor); + invariant(firstRecord > _oplogStones->firstRecord && firstRecord <= stone->lastRecord, + str::stream() + << "First oplog record is not in truncation range. FirstRecord: " + << firstRecord + << " TrunStart: " + << _oplogStones->firstRecord + << " TruncEnd: " + << stone->lastRecord); + } + WT_SESSION* session = ru->getSession()->getSession(); try { @@ -1081,11 +1100,29 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx) { // Stash the truncate point for next time to cleanly skip over tombstones, etc. _oplogStones->firstRecord = stone->lastRecord; } catch (const WriteConflictException&) { - LOG(1) << "Caught WriteConflictException while truncating oplog entries, retrying"; + LOG(0) << "Caught WriteConflictException while truncating oplog entries, retrying"; + continue; + } + + { + // After truncation, the first record in the oplog should be outside the truncation + // range. + auto validatingSession = ru->getSessionCache()->getSession(); + WT_CURSOR* cursor = validatingSession->getCursor(_uri, _tableId, true); + ON_BLOCK_EXIT([&] { validatingSession->releaseCursor(_tableId, cursor); }); + int ret = WT_READ_CHECK(cursor->next(cursor)); + invariantWTOK(ret); + RecordId firstRecord = getKey(cursor); + invariant(firstRecord > stone->lastRecord, + str::stream() + << "First oplog record is still inside truncation range. FirstRecord: " + << firstRecord + << " TruncEnd: " + << stone->lastRecord); } } - LOG(1) << "Finished truncating the oplog, it now contains approximately " << _numRecords.load() + LOG(0) << "Finished truncating the oplog, it now contains approximately " << _numRecords.load() << " records totaling to " << _dataSize.load() << " bytes"; }