commit c9190c9d13c8f3556a0fa0438ac177255bcce13d Author: Minji Date: Thu Jul 12 11:26:00 2018 -0400 SERVER-20445 Add support for majority read concern level to MapReduce and Aggregation diff --git a/jstests/noPassthrough/read_majority_out.js b/jstests/noPassthrough/read_majority_out.js new file mode 100644 index 0000000..ed8d0d7 --- /dev/null +++ b/jstests/noPassthrough/read_majority_out.js @@ -0,0 +1,166 @@ +/** + * Tests that read operations with readConcern majority only see committed data. + * + * The following read operations are tested: + * - find command + * - aggregation + * - distinct + * - count + * - parallelCollectionScan + * - geoNear + * - geoSearch + * + * Each operation is tested on a single node, and (if supported) through mongos on both sharded and + * unsharded collections. Mongos doesn't directly handle readConcern majority, but these tests + * should ensure that it correctly propagates the setting to the shards when running commands. + * @tags: [requires_sharding] + */ + +(function() { + 'use strict'; + + // Skip this test if running with --nojournal and WiredTiger. + if (jsTest.options().noJournal && + (!jsTest.options().storageEngine || jsTest.options().storageEngine === "wiredTiger")) { + print("Skipping test because running WiredTiger without journaling isn't a valid" + + " replica set configuration"); + return; + } + + var testServer = MongoRunner.runMongod(); + var db = testServer.getDB("test"); + if (!db.serverStatus().storageEngine.supportsCommittedReads) { + print("Skipping read_majority.js since storageEngine doesn't support it."); + MongoRunner.stopMongod(testServer); + return; + } + MongoRunner.stopMongod(testServer); + + function makeCursor(db, result) { + return new DBCommandCursor(db, result); + } + + // These test cases are functions that return a cursor of the documents in collections without + // fetching them yet. + var cursorTestCases = { + aggregate: function(coll, finishColl) { + + assert.commandWorked(coll.runCommand( + 'aggregate', + {readConcern: {level: 'majority'}, cursor: {}, pipeline: [{$match: {_id: 1}},{$out: finishColl.getName()}]})); + jsTestLog(tojson(coll.findOne({_id: 1}))); + jsTestLog(tojson(finishColl.findOne())); + return makeCursor( + finishColl.getDB(), + assert.commandWorked(finishColl.runCommand( + 'find', + {readConcern: {level: 'local'}}))); + } + /* + aggregate: function(coll) { + return makeCursor( + coll.getDB(), + assert.commandWorked(coll.runCommand( + 'aggregate', + {readConcern: {level: 'majority'}, cursor: {batchSize: 0}, pipeline: [{$match: {_id: 1}}]}))); + }*/ + + }; + + function runTests(coll, finishColl, mongodConnection) { + function makeSnapshot() { + return assert.commandWorked(mongodConnection.adminCommand("makeSnapshot")).name; + } + function setCommittedSnapshot(snapshot) { + assert.commandWorked(mongodConnection.adminCommand({"setCommittedSnapshot": snapshot})); + } + + for (var testName in cursorTestCases) { + jsTestLog('Running ' + testName + ' against ' + coll.toString()); + var getCursor = cursorTestCases[testName]; + //assert.commandWorked(coll.getDB().adminCommand({setParameter: 1, traceExceptions: true})); + // Setup initial state. + + assert.writeOK(coll.remove({})); + for (var i = 0; i < 10; i++) + coll.insert({a: i}); + + assert.writeOK(coll.save({_id: 1, state: 'before'})); + setCommittedSnapshot(makeSnapshot()); + // Check initial conditions. + // jsTestLog(tojson(getCursor(coll, finishColl).next().state)); + assert.eq(getCursor(coll, finishColl).next().state, 'before'); +/* + // Change state without making it committed. + assert.writeOK(finishColl.save({_id: 1, state: 'after'})); + + // Cursor still sees old state. + assert.eq(getCursor(coll, finishColl).next().state, 'before'); + + // Create a cursor before the update is visible. + var oldCursor = getCursor(coll, finishColl); + + // Making a snapshot doesn't make the update visible yet. + var snapshot = makeSnapshot(); + assert.eq(getCursor(coll, finishColl).next().state, 'before'); + + // Setting it as committed does for both new and old cursors. + setCommittedSnapshot(snapshot); + assert.eq(getCursor(coll, finishColl).next().state, 'after'); + assert.eq(oldCursor.next().state, 'after');*/ + } + + assert.commandWorked(coll.ensureIndex({point: '2dsphere'})); + assert.commandWorked(coll.ensureIndex({point: 'geoHaystack', _id: 1}, {bucketSize: 1})); + } + + var replTest = new ReplSetTest({ + nodes: 1, + oplogSize: 2, + nodeOptions: { + setParameter: 'testingSnapshotBehaviorInIsolation=true', + enableMajorityReadConcern: '', + shardsvr: '', + vvv: "" + } + }); + replTest.startSet(); + // Cannot wait for a stable checkpoint with 'testingSnapshotBehaviorInIsolation' set. + replTest.initiateWithAnyNodeAsPrimary( + null, "replSetInitiate", {doNotWaitForStableCheckpoint: true}); + + var mongod = replTest.getPrimary(); + assert.commandWorked(mongod.getDB("admin").runCommand({setParameter: 1, traceExceptions:true})); + (function testSingleNode() { + var db = mongod.getDB("singleNode"); + runTests(db.collection, db.finishColl, mongod); + })(); + + var shardingTest = new ShardingTest({ + shards: 0, + mongos: 1, + }); + assert(shardingTest.adminCommand({addShard: replTest.getURL()})); + + (function testUnshardedDBThroughMongos() { + var db = shardingTest.getDB("throughMongos"); + runTests(db.unshardedDB, db.finishColl, mongod); + })(); + + shardingTest.adminCommand({enableSharding: 'throughMongos'}); + + (function testUnshardedCollectionThroughMongos() { + var db = shardingTest.getDB("throughMongos"); + runTests(db.unshardedCollection, db.finishColl, mongod); + })(); + + (function testShardedCollectionThroughMongos() { + var db = shardingTest.getDB("throughMongos"); + var collection = db.shardedCollection; + shardingTest.adminCommand({shardCollection: collection.getFullName(), key: {_id: 1}}); + runTests(collection, db.finishColl, mongod); + })(); + + shardingTest.stop(); + replTest.stopSet(); +})(); diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 71644eb..34bd8eb 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -223,12 +223,6 @@ intrusive_ptr DocumentSourceOut::createFromBson( str::stream() << "$out only supports a string argument, not " << typeName(elem.type()), elem.type() == String); - auto readConcernLevel = repl::ReadConcernArgs::get(pExpCtx->opCtx).getLevel(); - uassert(ErrorCodes::InvalidOptions, - "$out can not be used with either a 'majority' or 'snapshot' read concern level", - readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern && - readConcernLevel != repl::ReadConcernLevel::kSnapshotReadConcern); - NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str()); uassert(17385, "Can't $out to special collection: " + elem.str(), !outputNs.isSpecial()); return new DocumentSourceOut(outputNs, pExpCtx); diff --git a/src/mongo/db/storage/kv/kv_catalog.cpp b/src/mongo/db/storage/kv/kv_catalog.cpp index 3fcab58..79ae5b0 100644 --- a/src/mongo/db/storage/kv/kv_catalog.cpp +++ b/src/mongo/db/storage/kv/kv_catalog.cpp @@ -534,6 +534,7 @@ Status KVCatalog::renameCollection(OperationContext* opCtx, bool stayTemp) { RecordId loc; BSONObj old = _findEntry(opCtx, fromNS, &loc).getOwned(); + LOG(3) << "minji: " << old; { BSONObjBuilder b;