diff --git a/jstests/sharding/union_with_read_preference.js b/jstests/sharding/union_with_read_preference.js new file mode 100644 index 0000000000..d13266e133 --- /dev/null +++ b/jstests/sharding/union_with_read_preference.js @@ -0,0 +1,169 @@ +// Tests that sub-queries across shards as part of $unionWith will obey the read preference +// specified by the user. +(function() { +"use strict"; + +load('jstests/libs/profiler.js'); // For various profiler helpers. +load("jstests/aggregation/extras/utils.js"); // For arrayEq() + +// For supportsMajorityReadConcern. +load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + +// This test only works on storage engines that support committed reads, skip it if the +// configured engine doesn't support it. +if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; +} + +const st = new ShardingTest({name: "union_with_read_pref", mongos: 2, shards: 2, rs: {nodes: 2}}); + +const dbName = jsTestName() + "_db"; +st.s0.setCausalConsistency(true); +const mongosDB = st.s0.getDB(dbName); + +// In this test we perform writes which we expect to read on a secondary, so we need to enable +// causal consistency. +const mongosColl = mongosDB[jsTestName()]; +const unionedColl = mongosDB.union_target; + +// Shard the test collection on _id with 2 chunks: [MinKey, 0), [0, MaxKey]. +st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 0}); +// Shard the union's target collection on _id with the same chunks, but moving the negative chunk +// off the primary shard so their distributions are flipped. +st.shardColl(unionedColl, {_id: 1}, {_id: 0}, {_id: -1}); + +// Turn on the profiler. +for (let rs of [st.rs0, st.rs1]) { + const primary = rs.getPrimary(); + const secondary = rs.getSecondary(); + assert.commandWorked(primary.getDB(dbName).setProfilingLevel(2, -1)); + assert.commandWorked( + primary.adminCommand({setParameter: 1, logComponentVerbosity: {query: {verbosity: 3}}})); + assert.commandWorked(secondary.getDB(dbName).setProfilingLevel(2, -1)); + assert.commandWorked( + secondary.adminCommand({setParameter: 1, logComponentVerbosity: {query: {verbosity: 3}}})); +} + +// Write a document to each chunk. +assert.commandWorked(mongosColl.insert([{_id: -1, coll: 0}, {_id: 1, coll: 0}], {writeConcern: {w: "majority"}})); +assert.commandWorked(unionedColl.insert([{_id: -1, coll: 1}, {_id: 1, coll: 1}], {writeConcern: {w: "majority"}})); + +// // Test that $unionWith goes to the primary by default. +let unionWithComment = "union against primary"; +assert.eq(mongosColl + .aggregate([{$unionWith: unionedColl.getName()}, {$sort: {_id: 1}}], + {comment: unionWithComment}) + .toArray(), + // [{_id: -1}, {_id: -1}, {_id: 1}, {_id: 1}]); + [{_id: -1, coll: 0}, {_id: -1, coll: 1}, {_id: 1, coll: 0}, {_id: 1, coll: 1}]); + +// Test that the union's sub-pipelines go to the primary. +for (let rs of [st.rs0, st.rs1]) { + const primaryDB = rs.getPrimary().getDB(dbName); + profilerHasSingleMatchingEntryOrThrow({ + profileDB: primaryDB, + filter: { + ns: unionedColl.getFullName(), + op: {$ne: "getmore"}, + "command.comment": unionWithComment, + } + }); +} + +// // Test that $unionWith subpipelines go to the secondary when the readPreference is {mode: +// // "secondary"}. +unionWithComment = 'union against secondary'; +assert.eq(mongosColl + .aggregate([{$unionWith: unionedColl.getName()}, {$sort: {_id: 1}}], { + comment: unionWithComment, + $readPreference: {mode: "secondary"}, + readConcern: {level: "majority"} + }) + .toArray(), + // [{_id: -1}, {_id: -1}, {_id: 1}, {_id: 1}]); + [{_id: -1, coll: 0}, {_id: -1, coll: 1}, {_id: 1, coll: 0}, {_id: 1, coll: 1}]); + +// Test that the union's sub-pipelines go to the secondary. +for (let rs of [st.rs0, st.rs1]) { + const secondaryDB = rs.getSecondary().getDB(dbName); + profilerHasSingleMatchingEntryOrThrow({ + profileDB: secondaryDB, + filter: { + ns: unionedColl.getFullName(), + op: {$ne: "getmore"}, + "command.comment": unionWithComment, + // We need to filter out any profiler entries with a stale config - this is the first + // read on this secondary with a readConcern specified, so it is the first read on this + // secondary that will enforce shard version. + errCode: {$ne: ErrorCodes.StaleConfig} + } + }); +} + +// Now a more extreme test, add a nested $unionWith and a more complicated sub - pipeline to ensure +// any sub-operation always goes to the secondary if the read preference is secondary. +jsTestLog("TEDLOG complex union against secondary"); +const secondTargetColl = mongosDB.second_union_target; +st.shardColl(secondTargetColl, {_id: 1}, {_id: 0}, {_id: -1}); +assert.commandWorked( + secondTargetColl.insert([{_id: -1, coll: 2}, {_id: 1, coll: 2}], {writeConcern: {w: "majority"}})); +assert.eq(secondTargetColl.find().readPref("secondary").itcount(), 2); +unionWithComment = 'complex union against secondary'; +let runAgg = () => mongosColl + .aggregate( + [ + { + $unionWith: { + coll: unionedColl.getName(), + pipeline: [ + {$unionWith: secondTargetColl.getName()}, + {$group: {_id: "$_id", count: {$sum: 1}}} + ] + } + }, + {$group: {_id: "$_id", count: {$sum: {$ifNull: ["$count", 1]}}}}, + {$sort: {_id: 1}} + ], + { + comment: unionWithComment, + $readPreference: {mode: "secondary"}, + readConcern: {level: "majority"} + }) + .toArray(); +assert.eq(runAgg(), [{_id: -1, count: 3}, {_id: 1, count: 3}]); + +// Test that the union's sub-pipelines go to the secondary. +for (let rs of [st.rs0, st.rs1]) { + jsTestLog(`Testing profile on shard ${rs.getURL()}`); + const secondaryDB = rs.getSecondary().getDB(dbName); + profilerHasSingleMatchingEntryOrThrow({ + profileDB: secondaryDB, + filter: { + ns: unionedColl.getFullName(), + op: {$ne: "getmore"}, + "command.comment": unionWithComment, + // We need to filter out any profiler entries with a stale config - this is the first + // read on this secondary with a readConcern specified, so it is the first read on this + // secondary that will enforce shard version. + errCode: {$ne: ErrorCodes.StaleConfig} + } + }); + profilerHasSingleMatchingEntryOrThrow({ + profileDB: secondaryDB, + filter: { + ns: secondTargetColl.getFullName(), + op: {$ne: "getmore"}, + "command.comment": unionWithComment, + // We need to filter out any profiler entries with a stale config - this is the first + // read on this secondary with a readConcern specified, so it is the first read on this + // secondary that will enforce shard version. + errCode: {$ne: ErrorCodes.StaleConfig} + } + }); +} + +assert.soon(() => arrayEq(runAgg(), [{_id: -1, count: 3}, {_id: 1, count: 3}])); + +st.stop(); +}()); diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 0f75917dc4..b4716e68b3 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -175,6 +175,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { if (_executionState == ExecutionProgress::kIteratingSource) { auto nextInput = pSource->getNext(); if (!nextInput.isEOF()) { + LOGV2_WARNING(4661202, "TEDLOG unionWith returning doc from source {doc}", "doc"_attr = nextInput.getDocument().toString()); return nextInput; } _executionState = ExecutionProgress::kStartingSubPipeline; @@ -209,8 +210,10 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { } auto res = _pipeline->getNext(); - if (res) + if (res) { + LOGV2_WARNING(4661201, "TEDLOG unionWith returning doc from union {doc}", "doc"_attr = res->toString()); return std::move(*res); + } _executionState = ExecutionProgress::kFinished; return GetNextResult::makeEOF(); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index f8144df09b..468e268b0c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -164,8 +164,8 @@ std::vector establishShardCursors( const ReadPreferenceSetting& readPref) { LOGV2_DEBUG(20904, 1, - "Dispatching command {cmdObj} to establish cursors on shards", - "cmdObj"_attr = redact(cmdObj)); + "Dispatching command {cmdObj} to establish cursors on shards with readPref {readPref}", + "cmdObj"_attr = redact(cmdObj), "readPref"_attr = readPref.toContainingBSON().toString(false)); const bool mustRunOnAll = mustRunOnAllShards(nss, hasChangeStream); std::vector> requests; @@ -190,6 +190,7 @@ std::vector establishShardCursors( } else { // The collection is unsharded. Target only the primary shard for the database. // Don't append shard version info when contacting the config servers. + LOGV2_WARNING(4661208, "TEDLOG found ns {ns} unsharded", "ns"_attr = nss.ns()); const auto cmdObjWithShardVersion = !routingInfo->db().primary()->isConfig() ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) : cmdObj; @@ -795,6 +796,7 @@ DispatchShardPipelineResults dispatchShardPipeline( const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream); std::set shardIds = getTargetedShards(opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, collationObj); + LOGV2_WARNING(4661203, "TEDLOG targeting # shards: {shards}, first targeted: {first} for ns: {ns}", "shards"_attr = shardIds.size(),"first"_attr=shardIds.begin()->toString(), "ns"_attr = expCtx->ns); // Don't need to split the pipeline if we are only targeting a single shard, unless: // - There is a stage that needs to be run on the primary shard and the single target shard @@ -1136,6 +1138,10 @@ std::unique_ptr attachCursorToPipeline(Pipeline* owne invariant(pipeline->getSources().empty() || !dynamic_cast(pipeline->getSources().front().get())); + // auto routingInfo = uassertStatusOK( + // Grid::get(expCtx->opCtx) + // ->catalogCache() + // ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, expCtx->ns, true)); auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); return shardVersionRetry( expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() { @@ -1151,10 +1157,22 @@ std::unique_ptr attachCursorToPipeline(Pipeline* owne } void logFailedRetryAttempt(StringData taskDescription, const DBException& exception) { - LOGV2_DEBUG(4553800, - 3, - "Retrying {task_description}. Got error: {exception}", + // LOGV2_DEBUG(4553800, + // // 3, + // "Retrying {task_description}. Got error: {exception}", + // "task_description"_attr = taskDescription, + // "exception"_attr = exception); + LOGV2_WARNING(4553800, + "TEDLOG Retrying {task_description}. Got error: {exception}", "task_description"_attr = taskDescription, "exception"_attr = exception); } + +void logCollVersionInfo(OperationContext* opCtx, CatalogCache* cc, const NamespaceString nss) { + auto chunkManager = cc->getCollectionRoutingInfo(opCtx, nss).getValue().cm(); + LOGV2_WARNING(4661209, "TEDLOG {db} cm null? {nullptr}", "db"_attr = nss.ns(), "nullptr"_attr = chunkManager == nullptr); + if (chunkManager != nullptr) { + LOGV2_WARNING(4661210, "TEDLOG coll version is {v}", "v"_attr = chunkManager->getVersion()); + } +} } // namespace mongo::sharded_agg_helpers diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 3457bff961..f3e799426e 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -198,6 +198,7 @@ std::unique_ptr attachCursorToPipeline(Pipeline* owne * header file. */ void logFailedRetryAttempt(StringData taskDescription, const DBException&); +void logCollVersionInfo(OperationContext* opCtx, CatalogCache* cc, const NamespaceString nss); /** * A retry loop which handles errors in ErrorCategory::StaleShardVersionError. When such an error is @@ -223,6 +224,7 @@ auto shardVersionRetry(OperationContext* opCtx, return false; }; while (true) { + logCollVersionInfo(opCtx, catalogCache, nss); catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, numAttempts); try { return callbackFn(); diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp index 28f326e0c5..284573322e 100644 --- a/src/mongo/s/query/document_source_merge_cursors.cpp +++ b/src/mongo/s/query/document_source_merge_cursors.cpp @@ -95,14 +95,25 @@ std::unique_ptr DocumentSourceMergeCursors::convertToRouterSta DocumentSource::GetNextResult DocumentSourceMergeCursors::doGetNext() { if (!_blockingResultsMerger) { - populateMerger(); + // try { + populateMerger(); + // Grid::get(getGlobalServiceContext())->shardRegistry()->reload(pExpCtx->opCtx); + // } catch (DBException& ex) { + // ex.addContext("Failed to populate BlockingResultsMerger"); + // throw; + // } } - auto next = uassertStatusOK(_blockingResultsMerger->next(pExpCtx->opCtx, _execContext)); - if (next.isEOF()) { - return GetNextResult::makeEOF(); + try { + auto next = uassertStatusOK(_blockingResultsMerger->next(pExpCtx->opCtx, _execContext)); + if (next.isEOF()) { + return GetNextResult::makeEOF(); + } + return Document::fromBsonWithMetaData(*next.getResult()); + } catch (DBException& ex) { + ex.addContext("Failed to execute $mergeCursors"); + throw; } - return Document::fromBsonWithMetaData(*next.getResult()); } Value DocumentSourceMergeCursors::serialize(