Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-33944

Change stream on sharded collection with non-simple default collation is erroneously invalidated upon chunk migration

    • Type: Icon: Bug Bug
    • Resolution: Gone away
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: Querying
    • Query
    • ALL
    • 60

      As background, when a change stream is opened on a collection that doesn't exist, the change stream is invalidated if the collection is created with a collation other than the collation specified on the change stream.

      The following steps can lead to a change stream getting erroneously invalidated (say we have a sharded cluster with 2 shards and 1 mongos):
      1) Create a collection with a non-simple collation
      2) Shard the collection.
      3) Insert some data (all of the data goes to the primary shard, shard 0)
      3) Create a change stream on that collection with no collation specified.
      4) Move a chunk from shard 0 to shard 1
      5) Read from the change stream

      At step 3 we will establish cursors on all of the shards, using the collection-default collation. On shard 0 this is whatever non-simple collation we created the collection with. Shard 1 doesn't know the collection exists, so its default is the simple collation.

      At step 4, the 'createCollection' oplog entry will get applied on shard 1. This means that when shard 1 reads from the change stream, it will see a 'createCollection' with a collation other than the one being used by the change stream. This will trigger an invalidate.

      Here's a repro:

      (function() {
          load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.
          load("jstests/libs/change_stream_util.js");        // For 'ChangeStreamTest'.
      
          const st = new ShardingTest({
              shards: 2,
              mongos: 1,
              rs: {
                  nodes: 1,
              },
          });
          const s = st.s;
          let db = s.getDB("test");
      
          print("Disabling balancer...");
          db.adminCommand( { balancerStop: 1 } )
          let cst = new ChangeStreamTest(db);
      
          const caseInsensitive = {locale: "en_US", strength: 2};
      
          let caseInsensitiveCollectionName = "change_stream_case_insensitive";
          assertDropCollection(db, caseInsensitiveCollectionName);
      
          const caseInsensitiveCollection =
                assertCreateCollection(db, caseInsensitiveCollectionName, {collation: caseInsensitive});
      
      
          assert.commandWorked(db.adminCommand({enableSharding: "test"}));
          st.ensurePrimaryShard('test', st.shard0.shardName);
      
          // TODO: Insert some docs (to shard 0)
          for (let i = 0; i < 10; i += 2) {
              assert.writeOK(caseInsensitiveCollection.insert({_id: i, text: "aBc"}));
              assert.writeOK(caseInsensitiveCollection.insert({_id: i + 1, text: "abc"}));
          }
      
          assert.commandWorked(caseInsensitiveCollection.createIndex({_id: "hashed"},
                                                                     {collation: {locale: "simple"}}));
          let res = db.adminCommand(
              {shardCollection: caseInsensitiveCollection.getFullName(),
               key: {_id: 'hashed'}, collation: {locale: "simple"}});
          assert.commandWorked(res);
      
          // open CS.
          const implicitCaseInsensitiveStream = cst.startWatchingChanges({
              pipeline: [
                  {$changeStream: {}},
                  {$match: {"fullDocument.text": "abc"}},
                  // Be careful not to use _id in this projection, as startWatchingChanges() will exclude
                  // it by default, assuming it is the resume token.
                  {$project: {docId: "$documentKey._id"}}
              ],
              collection: caseInsensitiveCollection
          });
          
          // moveChunk to shard 1.
          assert.commandWorked(db.adminCommand({
              moveChunk: caseInsensitiveCollection.getFullName(),
              find: {_id: 1},
              to: st.rs1.getURL(),
              _waitForDelete: false
          }));
      
          // Read from CS.
          cst.assertNextChangesEqual(
              {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
      
          st.stop();
      })();
      

      resmoke.py --suites=no_passthrough repro-file.js

      CC charlie.swanson david.storch nicholas.zolnierz

            Assignee:
            backlog-server-query Backlog - Query Team (Inactive)
            Reporter:
            ian.boros@mongodb.com Ian Boros
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

              Created:
              Updated:
              Resolved: