Query using $or and $ne on sharded collection can include orphan documents

XMLWordPrintableJSON

    • Query Optimization
    • Fully Compatible
    • ALL
    • v8.2, v8.0, v7.0, v6.0
    • Hide
      import {ShardingTest} from "jstests/libs/shardingtest.js";
      
      const docs = [
          {_id: 0, t: 6, m: NumberInt(0)},
          {_id: 1, t: 0, m: NumberInt(0)}
      ];
      const query = [
          {$project: {a: "$t"}},
          {$match: {$or: [{a: 6, m: {$ne: 1}}]}}
      ];
      
      const st = new ShardingTest({shards: 2, mongos: 1});
      const shardedDb = st.getDB("test");
      assert(st.adminCommand({enablesharding: "test", primaryShard: st.shard0.shardName}));
      
      const coll = shardedDb.coll;
      assert(coll.drop());
      assert.commandWorked(coll.insert(docs));
      assert.commandWorked(coll.createIndex({t: 1}));
      
      const resultsBeforeSharding = coll.aggregate(query).toArray()
      jsTestLog('results before sharding');
      jsTestLog(resultsBeforeSharding);
      // [ { "_id" : 0, "a" : 6 } ]
      
      assert(st.adminCommand({shardcollection: `test.coll`, key: {t: 1}}));
      // One doc on each shard
      assert(st.adminCommand({split: `test.coll`, middle: {t: 3}}));
      // shard0 has the t=0 doc, shard1 has the t=6 doc
      assert(st.adminCommand({moveChunk: `test.coll`, find: {t: 6}, to: st.shard1.shardName}))
      
      jsTestLog(st.getDB("admin").aggregate([{$shardedDataDistribution: {}}]).toArray())
      // One orphan doc exists on shard0
      
      const resultsAfterSharding = coll.aggregate(query).toArray();
      jsTestLog('results after sharding')
      jsTestLog(resultsAfterSharding);
      // [ { "_id" : 0, "a" : 6 }, { "_id" : 0, "a" : 6 } ]
      jsTestLog(coll.explain().aggregate(query));
      // no shard filter stage
      
      assert.eq(resultsBeforeSharding, resultsAfterSharding, {resultsBeforeSharding, resultsAfterSharding})
      
      st.stop();
      
      Show
      import {ShardingTest} from "jstests/libs/shardingtest.js" ; const docs = [ {_id: 0, t: 6, m: NumberInt(0)}, {_id: 1, t: 0, m: NumberInt(0)} ]; const query = [ {$project: {a: "$t" }}, {$match: {$or: [{a: 6, m: {$ne: 1}}]}} ]; const st = new ShardingTest({shards: 2, mongos: 1}); const shardedDb = st.getDB( "test" ); assert (st.adminCommand({enablesharding: "test" , primaryShard: st.shard0.shardName})); const coll = shardedDb.coll; assert (coll.drop()); assert .commandWorked(coll.insert(docs)); assert .commandWorked(coll.createIndex({t: 1})); const resultsBeforeSharding = coll.aggregate(query).toArray() jsTestLog( 'results before sharding' ); jsTestLog(resultsBeforeSharding); // [ { "_id" : 0, "a" : 6 } ] assert (st.adminCommand({shardcollection: `test.coll`, key: {t: 1}})); // One doc on each shard assert (st.adminCommand({split: `test.coll`, middle: {t: 3}})); // shard0 has the t=0 doc, shard1 has the t=6 doc assert (st.adminCommand({moveChunk: `test.coll`, find: {t: 6}, to: st.shard1.shardName})) jsTestLog(st.getDB( "admin" ).aggregate([{$shardedDataDistribution: {}}]).toArray()) // One orphan doc exists on shard0 const resultsAfterSharding = coll.aggregate(query).toArray(); jsTestLog( 'results after sharding' ) jsTestLog(resultsAfterSharding); // [ { "_id" : 0, "a" : 6 }, { "_id" : 0, "a" : 6 } ] jsTestLog(coll.explain().aggregate(query)); // no shard filter stage assert .eq(resultsBeforeSharding, resultsAfterSharding, {resultsBeforeSharding, resultsAfterSharding}) st.stop();
    • 200
    • None
    • None
    • None
    • None
    • None
    • None
    • None
    • 0

      Issue Status

      ISSUE DESCRIPTION AND IMPACT
      On sharded clusters in the presence of moveChunk operations, specific aggregation queries including a renaming stage on the shard key as well as a subsequent disjunction ($or) expression on said renamed shard key, may produce duplicate or boundedly stale results.

      Example:
      The collection has a document where the field ‘a’ is used as the shard key. Running a query that renames 'a' to 'b' and includes it in a disjunction, while chunk migrations are in progress, the query may return duplicate or stale results. In the provided example we explicitly force ‘split’ and ‘moveChunk’ operations which under normal operation, the server performs automatically.

      Assuming that we have two shards "shard001", "shard002"

      const docs = [
         {_id: 0, shardKey: 6, data: NumberInt(0)},
         {_id: 1, shardKey: 0, data: NumberInt(0)},
      ];
      
      db.coll.insert(docs);
      
      coll.createIndex({ shardKey: 1 });
      
      sh.shardCollection("coll", { shardKey: 1 });
      
      sh.splitAt("coll", { shardKey: 3 })
      
      sh.moveChunk("coll", { shardKey: 6 }, "shard001")
      
      const query = [
         {$project: {renamedShardKey: "$shardKey"}},
         {$match: {$or: [{renamedShardKey: 6, data: {$ne: 1}}]}},
      ];
      
      coll.aggregate(query)
      // Result: [ { "_id" : 0, "renamedShardKey" : 6 }, { "_id" : 0, "renamedShardKey" : 6 } ]
      // (duplicate result)
      

      REMEDIATION AND WORKAROUNDS
      Users are recommended to upgrade to the fixed versions. Alternatively, ensure that no orphans exist, by enforcing ‘_waitForDelete: true’ for moveChunks operations.

      —-----------------------------------------------------

      Original description

      In the repro attached, we insert two documents into a collection and then shard the collection. Then we move a chunk so that each shard owns a document, but one shard has an orphaned document.

      After running the query in the repro, we see a document appear twice, because the query does not have a shard filter stage so the orphaned document is included in the results.

            Assignee:
            Matt Olma
            Reporter:
            Matt Boros
            Votes:
            0 Vote for this issue
            Watchers:
            24 Start watching this issue

              Created:
              Updated:
              Resolved: