[SERVER-19392] Don't check shard version on DocumentSourceCursor::loadBatch Created: 14/Jul/15  Updated: 06/Dec/22  Resolved: 16/Aug/19

Status: Closed
Project: Core Server
Component/s: Aggregation Framework
Affects Version/s: 3.0.4, 3.1.5
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: James Wahlin Assignee: Backlog - Query Team (Inactive)
Resolution: Done Votes: 0
Labels: query-44-grooming
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Assigned Teams:
Query
Operating System: ALL
Sprint: QuInt 8 08/28/15, QuInt A (10/12/15), QuInt B (11/02/15)
Participants:

 Description   

DocumentSourceCursor::loadBatch calls AutoGetCollectionForRead to obtain a collection lock. This has the unintended side-effect of checking shard version, which was already checked during PlanExecutor creation.



 Comments   
Comment by David Storch [ 16/Aug/19 ]

It appears that this issue has been fixed. Right now in the master branch, DocumentSourceCursor::loadBatch() still uses AutoGetCollectionForRead, but the responsibility for checking the shard version has been moved to AutoGetCollectionForReadCommand. I'm closing this issue as "Gone Away".

CC charlie.swanson james.wahlin

Comment by Charlie Swanson [ 13/Nov/15 ]

Assigning back to Jonathan. I was unable to reproduce after 2 days. I think in order to reliably reproduce any crash like this, we need to figure out what workload was running at the time of the crash. So please include the -L option, and upload the client logs if you are able to reproduce. Thanks!

Comment by Charlie Swanson [ 09/Nov/15 ]

ping. redbeard0531

Comment by Andy Schwerin [ 14/Oct/15 ]

I'd be comfortable with "3.3 Required".

Comment by Charlie Swanson [ 14/Oct/15 ]

Updating based on my knowledge. There are two bugs here we know about:

  1. The one described in this ticket, namely that we unnecessarily check the shard version each time we load a batch, which could potentially throw a stale shard version when it doesn't need to.
  2. The pipeline's cursor cannot be established until the PlanExecutor has been constructed, which can cause issues as described in SERVER-20909.

I was able to reproduce the first issue, but only by adding a sleep in the code, and extending the fsm framework to enable the balancer. The patch below can reproduce more than half the time:

diff --git a/jstests/concurrency/fsm_all_sharded.js b/jstests/concurrency/fsm_all_sharded.js
index 0545c7b..b45d43b 100644
--- a/jstests/concurrency/fsm_all_sharded.js
+++ b/jstests/concurrency/fsm_all_sharded.js
@@ -74,6 +74,6 @@ var blacklist = [
 ].map(function(file) { return dir + '/' + file; });
 
 // SERVER-16196 re-enable executing workloads against sharded clusters
-// runWorkloadsSerially(ls(dir).filter(function(file) {
-//     return !Array.contains(blacklist, file);
-// }), { sharded: true, useLegacyConfigServers: false });
+runWorkloadsSerially(ls(dir).filter(function(file) {
+    return !Array.contains(blacklist, file) && file.indexOf("agg_exact_shard_match.js") !== -1;
+}), { sharded: true, useLegacyConfigServers: false });
diff --git a/jstests/concurrency/fsm_libs/cluster.js b/jstests/concurrency/fsm_libs/cluster.js
index 2d0a0f0..b5d906e 100644
--- a/jstests/concurrency/fsm_libs/cluster.js
+++ b/jstests/concurrency/fsm_libs/cluster.js
@@ -64,6 +64,7 @@ var Cluster = function(options) {
     }
 
     var conn;
+    var st;
 
     var initialized = false;
 
@@ -89,8 +90,9 @@ var Cluster = function(options) {
         if (options.sharded) {
             // TODO: allow 'options' to specify the number of shards and mongos processes
             var shardConfig = {
-                shards: 2,
+                shards: 5,
                 mongos: 2,
+                other: { chunksize: 1, enableBalancer: true },
                 // Legacy config servers are pre-3.2 style, 3-node non-replica-set config servers
                 sync: options.useLegacyConfigServers,
                 verbose: verbosityLevel
@@ -107,7 +109,7 @@ var Cluster = function(options) {
                 };
             }
 
-            var st = new ShardingTest(shardConfig);
+            st = new ShardingTest(shardConfig);
             st.stopBalancer();
 
             conn = st.s; // mongos
@@ -243,6 +245,10 @@ var Cluster = function(options) {
         return conn.getDB(dbName);
     };
 
+    this.startBalancer = function startBalancer() {
+        return st.startBalancer();
+    };
+
     this.getHost = function getHost() {
         if (!initialized) {
             throw new Error('cluster has not been initialized yet');
diff --git a/jstests/concurrency/fsm_workloads/agg_base.js b/jstests/concurrency/fsm_workloads/agg_base.js
index 3ce16aa..37bce83 100644
--- a/jstests/concurrency/fsm_workloads/agg_base.js
+++ b/jstests/concurrency/fsm_workloads/agg_base.js
@@ -26,7 +26,7 @@ var $config = (function() {
         };
     })();
 
-    function padDoc(doc, size) {
+    data.padDoc = function padDoc(doc, size) {
         // first set doc.padding so that Object.bsonsize will include the field name and other
         // overhead
         doc.padding = "";
@@ -36,7 +36,7 @@ var $config = (function() {
         doc.padding = getStringOfLength(paddingLength);
         assertAlways.eq(size, Object.bsonsize(doc));
         return doc;
-    }
+    };
 
     var states = {
         query: function query(db, collName) {
@@ -55,7 +55,7 @@ var $config = (function() {
         for (var i = 0; i < this.numDocs; ++i) {
             // note: padDoc caches the large string after allocating it once, so it's ok to call it
             // in this loop
-            bulk.insert(padDoc({
+            bulk.insert(this.padDoc({
                 flag: i % 2 ? true : false,
                 rand: Random.rand(),
                 randInt: Random.randInt(this.numDocs)
diff --git a/jstests/concurrency/fsm_workloads/agg_exact_shard_match.js b/jstests/concurrency/fsm_workloads/agg_exact_shard_match.js
new file mode 100644
index 0000000..240c3d6
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/agg_exact_shard_match.js
@@ -0,0 +1,106 @@
+'use strict';
+
+/**
+ * agg_exact_shard_match.js
+ *
+ * TODO: document.
+ */
+load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload
+load('jstests/concurrency/fsm_workloads/agg_base.js'); // for $config
+
+var $config = extendWorkload($config, function($config, $super) {
+    $config.data.shardKey = { shardKey: 1 };
+    $config.data.numDocs = 10000;
+    $config.data.docSize = 1000;
+    $config.data.nUniqueShardKeys = 100;
+    $config.iterations = 100;
+    $config.threadCount = $config.data.nUniqueShardKeys;
+    $config.transitions = {
+        query: { query: 0.5, insert: 0.5 },
+        insert: { query: 0.5, insert: 0.5 },
+    };
+
+
+    $config.states = {
+        query: function query(db, collName) {
+            assertAlways.commandWorked(db.runCommand({ planCacheClear: collName }));
+            db[collName].aggregate([
+                { $match: { shardKey: this.tid } }
+            ]);
+        },
+        insert: function insert(db, collName) {
+            var key = Random.randInt(this.nUniqueShardKeys);
+            var bulk = db[collName].initializeUnorderedBulkOp();
+            var K = 100;
+            for (var i = 0; i < K; ++i) {
+                bulk.insert(this.padDoc({ shardKey: key }, this.docSize));
+            }
+            var res = bulk.execute();
+            assertWhenOwnColl.writeOK(res);
+        }
+    };
+
+    $config.setup = function setup(db, collName, cluster) {
+        // Load example data.
+        var bulk = db[collName].initializeUnorderedBulkOp();
+        for (var key = 0; key < this.nUniqueShardKeys; ++key) {
+            for (var i = 0; i < this.numDocs / this.nUniqueShardKeys; ++i) {
+                // Note: padDoc caches the large string after allocating it once, so it's ok to call
+                // it in this loop.
+                bulk.insert(this.padDoc({ shardKey: key }, this.docSize));
+            }
+        }
+        var res = bulk.execute();
+        assertWhenOwnColl.writeOK(res);
+        assertWhenOwnColl.commandWorked(db[collName].ensureIndex({ shardKey: 1, other: -1 }));
+
+        // Enable this failpoint to trigger more yields. In MMAPV1, if a record fetch is about to
+        // page fault, the query will yield. This failpoint will mock page faulting on such
+        // fetches every other time.
+        cluster.executeOnMongodNodes(function enableFailPoint(db) {
+            assertAlways.commandWorked(
+                db.adminCommand({ configureFailPoint: 'recordNeedsFetchFail', mode: 'alwaysOn' })
+            );
+        });
+
+        // Lower the following parameters to force even more yields.
+        cluster.executeOnMongodNodes(function lowerYieldParams(db) {
+            assertAlways.commandWorked(
+                db.adminCommand({ setParameter: 1, internalQueryCacheEvictionRatio: 0.0 })
+            );
+            assertAlways.commandWorked(
+                db.adminCommand({ setParameter: 1, internalQueryPlanEvaluationMaxResults: 500 })
+            );
+            assertAlways.commandWorked(
+                db.adminCommand({ setParameter: 1, internalQueryExecYieldIterations: 5 })
+            );
+            assertAlways.commandWorked(
+                db.adminCommand({ setParameter: 1, internalQueryExecYieldPeriodMS: 1 })
+            );
+        });
+        jsTestLog(tojson(cluster.startBalancer()));
+    };
+
+    /*
+     * Reset parameters and disable failpoint.
+     */
+    $config.teardown = function teardown(db, collName, cluster) {
+        $super.teardown(db, collName, cluster);
+
+        cluster.executeOnMongodNodes(function disableFailPoint(db) {
+            assertAlways.commandWorked(
+                db.adminCommand({ configureFailPoint: 'recordNeedsFetchFail', mode: 'off'})
+            );
+        });
+        cluster.executeOnMongodNodes(function resetYieldParams(db) {
+            assertAlways.commandWorked(
+                db.adminCommand({ setParameter: 1, internalQueryExecYieldIterations: 128 })
+            );
+            assertAlways.commandWorked(
+                db.adminCommand({ setParameter: 1, internalQueryExecYieldPeriodMS: 10 })
+            );
+        });
+    };
+
+    return $config;
+});
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 7d04cda..1601121 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -86,7 +86,8 @@ void DocumentSourceCursor::loadBatch() {
     // We have already validated the sharding version when we constructed the PlanExecutor
     // so we shouldn't check it again.
     const NamespaceString nss(_ns);
+    sleepmillis(10000);
     AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss);
 
     _exec->restoreState();

And I have verified that the following patch would fix the issue, although I think we have agreed this is not the ideal way to fix it.

diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index dabc86a..bd31130 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -68,21 +68,26 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn, const
 }
 
 AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn,
-                                                   const NamespaceString& nss)
+                                                   const NamespaceString& nss,
+                                                   bool checkShardVersion)
     : _txn(txn),
       _transaction(txn, MODE_IS),
       _db(_txn, nss.db(), MODE_IS),
       _collLock(_txn->lockState(), nss.toString(), MODE_IS),
       _coll(NULL) {
-    _init(nss.toString(), nss.coll());
+    _init(nss.toString(), nss.coll(), checkShardVersion);
 }
 
-void AutoGetCollectionForRead::_init(const std::string& ns, StringData coll) {
+void AutoGetCollectionForRead::_init(const std::string& ns,
+                                     StringData coll,
+                                     bool checkShardVersion) {
     massert(28535, "need a non-empty collection name", !coll.empty());
 
     // We have both the DB and collection locked, which the prerequisite to do a stable shard
     // version check.
-    ensureShardVersionOKOrThrow(_txn, ns);
+    if (checkShardVersion) {
+        ensureShardVersionOKOrThrow(_txn, ns);
+    }
 
     auto curOp = CurOp::get(_txn);
     stdx::lock_guard<Client> lk(*_txn->getClient());
diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h
index b37112c..f47343b 100644
--- a/src/mongo/db/db_raii.h
+++ b/src/mongo/db/db_raii.h
@@ -110,7 +110,9 @@ class AutoGetCollectionForRead {
 
 public:
     AutoGetCollectionForRead(OperationContext* txn, const std::string& ns);
-    AutoGetCollectionForRead(OperationContext* txn, const NamespaceString& nss);
+    AutoGetCollectionForRead(OperationContext* txn,
+                             const NamespaceString& nss,
+                             bool checkShardVersion = true);
     ~AutoGetCollectionForRead();
 
     Database* getDb() const {
@@ -122,7 +124,7 @@ public:
     }
 
 private:
-    void _init(const std::string& ns, StringData coll);
+    void _init(const std::string& ns, StringData coll, bool checkShardVersion = true);
 
     const Timer _timer;
     OperationContext* const _txn;
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 7d04cda..1601121 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -86,7 +86,7 @@ void DocumentSourceCursor::loadBatch() {
     // We have already validated the sharding version when we constructed the PlanExecutor
     // so we shouldn't check it again.
     const NamespaceString nss(_ns);
     sleepmillis(10000);
-    AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss);
+    AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss, false);
 
     _exec->restoreState();

Given that our proposed fix is more invasive (we discussed changing where the shard version is checked to be only when you construct a ClientCursor, or something like that), and that this bug is very difficult to reproduce, I propose bumping this out of 3.2, into Planning Bucket A or 3.3 Required.
cc redbeard0531, kaloian.manassiev, schwerin

Comment by Charlie Swanson [ 21/Sep/15 ]

Reopening, after realizing that in the case where an aggregation can be targeted to a single shard, the code will not happen in a getMore, so the ensureShardVersionOKOrThrow is not a no-op. Thus, if you have an aggregation targeting a single shard that takes longer to complete than a migration, it will keep throwing an error, and never complete.

Comment by Charlie Swanson [ 28/Aug/15 ]

Great. For the record, this is "Works as Designed" because the AutoGetCollectionForRead does not happen to check the shard version, since this code is always executed as part of a getMore, which happens over a non-sharded connection, so ensureShardVersionOKOrThrow is a no-op.

Comment by Mathias Stearn [ 28/Aug/15 ]

charlie.swanson correct. I think we can resolve this works as designed.

Comment by Charlie Swanson [ 28/Aug/15 ]

redbeard0531, what did we decide to do about this? Nothing? Just update the comment to describe the weirdness that makes this work?

Comment by Charlie Swanson [ 14/Aug/15 ]

Code review url: https://mongodbcr.appspot.com/19530001/

Comment by Mathias Stearn [ 10/Aug/15 ]

Appears to have been accidentally introduced in the locking refactor for 3.0

Generated at Thu Feb 08 03:50:48 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.