From 35b7618537194999d03f1f2f8096e72d890b23b8 Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Tue, 5 Apr 2022 13:38:38 +0000 Subject: [PATCH] Repro SERVER-65259 --- jstests/sharding/repro-server-65259.js | 82 +++++++++++++++++++ .../s/query/cluster_aggregation_planner.cpp | 7 +- 2 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 jstests/sharding/repro-server-65259.js diff --git a/jstests/sharding/repro-server-65259.js b/jstests/sharding/repro-server-65259.js new file mode 100644 index 00000000000..78aecb7e11c --- /dev/null +++ b/jstests/sharding/repro-server-65259.js @@ -0,0 +1,82 @@ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load('jstests/libs/parallel_shell_helpers.js'); + +const st = new ShardingTest({shards: 2}); + +const dbName = "test"; +const collName = "foo"; +const ns = dbName + "." + collName; + +let db = st.s.getDB(dbName); +let coll = db[collName]; + +assert.commandWorked( + st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName})); +assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}})); + +assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 0}})); +assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {x: 1}, to: st.shard1.shardName})); + +assert.commandWorked(coll.insert({x: -2})); +assert.commandWorked(coll.insert({x: -1})); +assert.commandWorked(coll.insert({x: 1})); +assert.commandWorked(coll.insert({x: 2})); + +// Start an aggregation that requires merging on a shard. Let it run until the shard cursors have +// been established but make it hang right before opening the merge cursor. +let shardedAggregateHangBeforeDispatchMergingPipelineFP = + configureFailPoint(st.s, "shardedAggregateHangBeforeDispatchMergingPipeline"); +let awaitAggregationShell = startParallelShell(() => { + assert.eq(0, db.getSiblingDB('test')['foo'].aggregate([{$out: 'foo.out'}]).itcount()); +}, st.s.port); +shardedAggregateHangBeforeDispatchMergingPipelineFP.wait(); + +// Start a chunk migration, let it run until it enters the critical section. +let hangBeforePostMigrationCommitRefresh = + configureFailPoint(st.shard0, "hangBeforePostMigrationCommitRefresh"); +let awaitMoveChunkShell = startParallelShell( + funWithArgs(function(toShard) { + assert.commandWorked(db.adminCommand({moveChunk: 'test.foo', find: {x: -1}, to: toShard})); + }, st.shard1.shardName), st.s.port); +hangBeforePostMigrationCommitRefresh.wait(); +sleep(5 * 1000); + +// Let the aggregation continue and try to establish the merge cursor (it will first fail because +// the shard is in the critical section. Mongos will transparently retry). +shardedAggregateHangBeforeDispatchMergingPipelineFP.off(); +sleep(1 * 1000); + +// Let the migration exit the critical section and complete. +hangBeforePostMigrationCommitRefresh.off(); + +// The aggregation will be able to complete now. +awaitAggregationShell(); +jsTest.log("--XXXX-- Aggregation finished"); + +awaitMoveChunkShell(); + +// Did any cursor leak? +sleep(5 * 1000); +const idleCursors = st.shard0.getDB('admin') + .aggregate([ + {$currentOp: {idleCursors: true, allUsers: true}}, + {$match: {type: 'idleCursor', ns: ns}} + ]) + .toArray(); +assert.eq(0, idleCursors.length, "Found idle cursors: " + tojson(idleCursors)); // <== Will fail here! + +// Check that range deletions can be completed (if a cursor was left open, the range deletion would +// not finish). +assert.soon( + () => { + return st.shard0.getDB('config')['rangeDeletions'].find().itcount() === 0; + }, + "Range deletion tasks did not finish: + " + + tojson(st.shard0.getDB('config')['rangeDeletions'].find().toArray()), + 10000); + +st.stop(); +})(); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 4e567e542ff..3c4358028ee 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -65,6 +65,7 @@ namespace cluster_aggregation_planner { MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline); MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor); +MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeDispatchMergingPipeline); using sharded_agg_helpers::DispatchShardPipelineResults; using sharded_agg_helpers::SplitPipeline; @@ -240,7 +241,9 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr& ex // attempt to kill the cursors. auto mergeCursors = static_cast(mergePipeline->peekFront()); mergeCursors->dismissCursorOwnership(); - + LOGV2(00000, + "--XXXX-- dispatchMergingPipeline - dismissed cursor ownership", + "mergeCursorResponse"_attr = mergeCursorResponse); return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result); } @@ -708,6 +711,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults); } + shardedAggregateHangBeforeDispatchMergingPipeline.pauseWhileSet(); + // If we reach here, we have a merge pipeline to dispatch. return dispatchMergingPipeline(expCtx, namespaces, -- 2.17.1