Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-32085

$changeStream reports incorrect documentKey for unsharded collections that become sharded

    • Fully Compatible
    • ALL
    • v3.6
    • Query 2017-12-04, Query 2017-12-18

      Following SERVER-30834, an active $changeStream on an unsharded collection will detect when that collection becomes sharded, and will open additional cursors on the new shards as chunks migrate to them. However, because we never re-establish the cursor on the original shard and it has already cached the unsharded documentKey as _id, all insert operations performed on the primary shard post-shardCollection will continue to produce $changeStream entries whose documentKey is _id and which omit the shard key.

      The primary shard therefore becomes a 'poison pill' for $changeStream from this point onwards:

      • All resume tokens taken from insert operations on the primary shard - including all inserts that occur post-sharding - are unusable. This is because the token's documentKey field won't match the documentKey returned for that oplog entry by the resumed $changeStream, which does include the shard key. DocumentSourceEnsureResumeTokenPresent consequently rejects the resume attempt and uasserts.
      • Because the sort key for sharded $changeStream is <ts:1,uuid:1,documentKey:1> this bug will produce an undefined sort order for operations from different shards which have the same timestamp (i.e. those which are the Nth operation on their respective shards within the same second).

      The existing change_streams_unsharded_becomes_sharded.js test does not exercise this behaviour because it (a) shards the collection on _id, so the documentKey is the same pre- and post-sharding; and (b) does not retrieve any documents from the $changeStream pre-sharding, so the stream is effectively dormant and the documentKey is not cached. The following patch demonstrates the bug:

      diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
      index e172b28f46..5db3b86319 100644
      --- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
      +++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
      @@ -29,6 +29,7 @@
           const mongosColl = mongosDB[testName];
      
           mongosDB.createCollection(testName);
      +    mongosColl.createIndex({x: 1});
      
           st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
      
      @@ -38,44 +39,57 @@
               cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: mongosColl});
           assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
      
      +    // Verify that the cursor picks up documents inserted while the collection is unsharded.
      +    assert.writeOK(mongosColl.insert({_id: 0, x: 0}));
      +    cst.assertNextChangesEqual({
      +        cursor: cursor,
      +        expectedChanges: [{
      +            documentKey: {_id: 0},
      +            fullDocument: {_id: 0, x: 0},
      +            ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
      +            operationType: "insert",
      +        }]
      +    });
      +
           // Enable sharding on the previously unsharded collection.
           assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
      
      -    // Shard the collection on _id.
      +    // Shard the collection on x.
           assert.commandWorked(
      -        mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
      +        mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {x: 1}}));
      
           // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
      -    assert.commandWorked(
      -        mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
      +    assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {x: 0}}));
      
      -    // Verify that the cursor is still valid and picks up the inserted document.
      -    assert.writeOK(mongosColl.insert({_id: 1}));
      +    // Move the [minKey, 0) chunk to shard1.
      +    assert.commandWorked(mongosDB.adminCommand({
      +        moveChunk: mongosColl.getFullName(),
      +        find: {x: -1},
      +        to: st.rs1.getURL(),
      +        _waitForDelete: true
      +    }));
      +
      +    // Make sure the change stream cursor sees a document inserted on the new shard.
      +    assert.writeOK(mongosColl.insert({_id: -1, x: -1}));
           cst.assertNextChangesEqual({
               cursor: cursor,
               expectedChanges: [{
      -            documentKey: {_id: 1},
      -            fullDocument: {_id: 1},
      +            documentKey: {x: -1, _id: -1},
      +            fullDocument: {_id: -1, x: -1},
                   ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
                   operationType: "insert",
               }]
           });
      
      -    // Move the [minKey, 0) chunk to shard1 and write a document to it.
      -    assert.commandWorked(mongosDB.adminCommand({
      -        moveChunk: mongosColl.getFullName(),
      -        find: {_id: -1},
      -        to: st.rs1.getURL(),
      -        _waitForDelete: true
      -    }));
      -    assert.writeOK(mongosColl.insert({_id: -1}));
      -
      -    // Make sure the change stream cursor sees the inserted document even after the moveChunk.
      +    // Verify that the cursor on the original shard is still valid and sees new inserted documents.
      +    // TODO: SERVER-32085 The 'documentKey' should now include the shard key in addition to _id.
      +    // This test fails at present.
      +    assert.writeOK(mongosColl.insert({_id: 1, x: 1}));
           cst.assertNextChangesEqual({
               cursor: cursor,
               expectedChanges: [{
      -            documentKey: {_id: -1},
      -            fullDocument: {_id: -1},
      +            documentKey: {x: 1, _id: 1},
      +            fullDocument: {_id: 1, x: 1},
                   ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
                   operationType: "insert",
               }]
      

            Assignee:
            bernard.gorman@mongodb.com Bernard Gorman
            Reporter:
            bernard.gorman@mongodb.com Bernard Gorman
            Votes:
            0 Vote for this issue
            Watchers:
            10 Start watching this issue

              Created:
              Updated:
              Resolved: