[SERVER-41196] Mongos invariant failure crash with change streams startAfter Created: 16/May/19  Updated: 29/Oct/23  Resolved: 29/Jun/19

Status: Closed
Project: Core Server
Component/s: None
Affects Version/s: 4.1.11
Fix Version/s: 4.2.0-rc3, 4.3.1

Type: Bug Priority: Major - P3
Reporter: Shane Harvey Assignee: Bernard Gorman
Resolution: Fixed Votes: 0
Labels: query-44-grooming
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Attachments: Text File mongos-python-1720-crash.log    
Issue Links:
Backports
Related
is related to PYTHON-1720 Support 'startAfter' option to the $c... Closed
Backwards Compatibility: Fully Compatible
Operating System: ALL
Backport Requested:
v4.2, v4.0
Sprint: Query 2019-06-17, Query 2019-07-01, Query 2019-07-15
Participants:

 Description   

Mongos crashes with this invariant failure:

2019-05-16T16:29:55.522-0700 F  -        [TaskExecutorPool-0] Invariant failure compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0 src/mongo/s/query/async_results_merger.cpp 525
2019-05-16T16:29:55.522-0700 F  -        [TaskExecutorPool-0] 
 
***aborting after invariant() failure
 
 
2019-05-16T16:29:55.542-0700 F  -        [TaskExecutorPool-0] Got signal: 6 (Abort trap: 6).
 0x1027b0eb9 0x1027b076d 0x7fff5be3cf5a 0x7fff5bcd83c6 0x7fff5bbda1ae 0x1027a46b2 0x101c8624c 0x101c81ece 0x101c8696b 0x101c8645a 0x101c8905a 0x101b2daf0 0x101df7af4 0x101df63b9 0x101df83ec 0x101df9f2e 0x101df9c09 0x101dfa299 0x101df5e35 0x101df7869 0x101e123a4 0x101e12838 0x101b21b6a 0x101e11e1c 0x101b21b6a 0x101e14cb2 0x101e149c0 0x101e15088 0x101b21b6a 0x101e0d3f1 0x101b21b6a 0x101e63186 0x101b21b6a 0x101e62c81 0x101b21b6a 0x101e62480 0x101b21b6a 0x101e61d71 0x101b21b6a 0x101e52381 0x101b21b6a 0x101e40640 0x101b21b6a 0x101e3ff0a 0x101e4161a 0x101e414c9 0x10215c421 0x102151d32 0x102151ba9 0x101e57a74 0x101e07398 0x101e0db1f 0x7fff5be46661 0x7fff5be4650d 0x7fff5be45bf9
----- BEGIN BACKTRACE -----
...

Attached is mongos-python-1720-crash.log which includes the full mongos log.

The reproduction is fairly simple:

  1. Start a change stream on collection "x", drop "x", and obtain the invalidate change stream document's resume token
  2. Start a new change stream passing the previous resume token to startAfter.
  3. Run a getMore on the new change stream -> crash.

In code:

MongoDB Enterprise mongos> var cs = db.test.watch([{'$match': {'operationType': 'invalidate'}}]);
MongoDB Enterprise mongos> db.test.insertOne({});
{
	"acknowledged" : true,
	"insertedId" : ObjectId("5cddf66ab38812eee363f373")
}
MongoDB Enterprise mongos> db.test.drop();
true
MongoDB Enterprise mongos>
MongoDB Enterprise mongos> var doc = cs.next();
MongoDB Enterprise mongos> var resume_token = doc['_id'];
MongoDB Enterprise mongos>
MongoDB Enterprise mongos> var cs = db.test.watch([], {startAfter: resume_token});
MongoDB Enterprise mongos> cs.next();
2019-05-16T16:46:51.624-0700 E  QUERY    [js] uncaught exception: Error: error doing query: failed: network error while attempting to run command 'getMore' on host '127.0.0.1:27018'  :
DB.prototype.runCommand@src/mongo/shell/db.js:170:23
DBCommandCursor.prototype._runGetMoreCommand@src/mongo/shell/query.js:803:18
DBCommandCursor.prototype._hasNextUsingCommands@src/mongo/shell/query.js:836:9
DBCommandCursor.prototype.hasNext@src/mongo/shell/query.js:844:16
DBCommandCursor.prototype.next@src/mongo/shell/query.js:863:14
@(shell):1:1
2019-05-16T16:46:51.626-0700 I  NETWORK  [js] trying reconnect to 127.0.0.1:27018 failed
2019-05-16T16:46:51.626-0700 I  NETWORK  [js] reconnect 127.0.0.1:27018 failed failed
2019-05-16T16:46:51.629-0700 I  NETWORK  [js] trying reconnect to 127.0.0.1:27018 failed
2019-05-16T16:46:51.630-0700 I  NETWORK  [js] reconnect 127.0.0.1:27018 failed failed

I'm using version:

mongos version v4.1.11-61-g7e1682c
git version: 7e1682c579f0b719fd4988e04b9b63eea0ebd03c
allocator: system
modules: enterprise
build environment:
    distarch: x86_64
    target_arch: x86_64



 Comments   
Comment by Githook User [ 29/Jun/19 ]

Author:

{'name': 'Bernard Gorman', 'email': 'bernard.gorman@gmail.com', 'username': 'gormanb'}

Message: SERVER-41196 Integrate 'invalidate' tokens into change stream's standard resume logic

(cherry picked from commit 7e4423b458fcefd37a62ebecf168716166b7dc4c)
Branch: v4.2
https://github.com/mongodb/mongo/commit/498b7851cfa4d4860fd6d865647c24f680b32bbd

Comment by Githook User [ 29/Jun/19 ]

Author:

{'name': 'Bernard Gorman', 'email': 'bernard.gorman@gmail.com', 'username': 'gormanb'}

Message: SERVER-41196 Integrate 'invalidate' tokens into change stream's standard resume logic
Branch: master
https://github.com/mongodb/mongo/commit/7e4423b458fcefd37a62ebecf168716166b7dc4c

Comment by Bernard Gorman [ 13/Jun/19 ]

justin.seyster: Approach (2) looks good! Approach (1) won't work, both because the ChangeStreamProxy does not exist on mongoS, and because mongoS does not have any concept of a postBatchResumeToken separate from what it receives from the shards. The AsyncResultsMerger on mongoS simply reads the PBRT returned from each shard and then returns the lowest of these to the client. For the initial batch, we therefore must return the exact, unmodified resume token as our first PBRT, otherwise we may hit the issue described in my previous comment.

Comment by Justin Seyster [ 11/Jun/19 ]

bernard.gorman Thanks for that explanation! I have two new proposals .

1: I can tweak the proposal from my previous comment so that it only strips the "fromInvalidate" from the resume token on the shards (not on the mongos). The patch would look the same, except the first if statement would be:

    if (_pipeline->getContext()->inMongos &&
        resumeTokenData.fromInvalidate == ResumeTokenData::kFromInvalidate)

2: I worked out how to implement the solution you proposed:

diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
index 7750bee..3e23dbc 100644
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -47,8 +47,13 @@ ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx,
     // pipeline construction, and use it to obtain the starting time for _latestOplogTimestamp.
     invariant(!_pipeline->getContext()->initialPostBatchResumeToken.isEmpty());
     _postBatchResumeToken = _pipeline->getContext()->initialPostBatchResumeToken.getOwned();
+    auto resumeTokenData = ResumeToken::parse(_postBatchResumeToken).getData();
+    if (resumeTokenData.fromInvalidate == ResumeTokenData::kFromInvalidate) {
+        resumeTokenData.fromInvalidate = ResumeTokenData::kNotFromInvalidate;
+        _invalidateToken = ResumeToken(resumeTokenData).toDocument().toBson();
+    }
     if (!_pipeline->getContext()->needsMerge || _pipeline->getContext()->mergeByPBRT) {
-        _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
+        _latestOplogTimestamp = resumeTokenData.clusterTime;
     }
 }
 
@@ -58,7 +63,10 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
         // the latest event observed in the oplog, the latter via its sort key metadata field.
         auto nextBSON = _validateAndConvertToBSON(*next);
         _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
-        _postBatchResumeToken = next->getSortKeyMetaField();
+        if (!_invalidateToken || (_invalidateToken->woCompare(next->getSortKeyMetaField()) != 0)) {
+            _postBatchResumeToken = next->getSortKeyMetaField();
+        }
+        _invalidateToken = boost::none;
         _setSpeculativeReadTimestamp();
         return nextBSON;
     }
diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h
index 3659dff..b5efe54 100644
--- a/src/mongo/db/exec/change_stream_proxy.h
+++ b/src/mongo/db/exec/change_stream_proxy.h
@@ -93,5 +93,6 @@ private:
 
     Timestamp _latestOplogTimestamp;
     BSONObj _postBatchResumeToken;
+    boost::optional<BSONObj> _invalidateToken = boost::none;
 };
 }  // namespace mongo

I'm interested in your thoughts on both of these. I'll also spend some more time validating each approach. I already have a jstest that is ready for review once we're happy with a fix.

Comment by Bernard Gorman [ 06/Jun/19 ]

justin.seyster: I don't think that will work, because the PBRT used to seed ChangeStreamProxy is always returned to the client as the first PBRT in the resumed stream. We'd end up returning a PBRT which was earlier than the resume token the client used as the startAfter argument. So the following could happen:

  1. Stream is invalidated, user gets resume token Ri
  2. User requests startAfter:Ri
  3. ChangeStreamProxy substitutes Rd for Ri as the initial PBRT
  4. Driver gets back initial batch of 0 results with PBRT Rd
  5. Network interruption: driver disconnects
  6. Driver attempts to restart from the most recent PBRT, i.e. startAfter:Rd
  7. We're now resuming from the actual drop rather than the invalidate, so the stream returns the drop event and then immediately re-invalidates itself.
Comment by Justin Seyster [ 06/Jun/19 ]

bernard.gorman Thanks for this extremely comprehensive explanation! I want to get your thoughts on a suggestion. What do you think about modifying the ChangeStreamProxyStage constructor so that when it gets initialized with a "fromInvalidate" resume token, we strip the "fromInvalidate" flag, effectively setting the shard's first PBRT to Rd instead of Ri (in step 4 above).

I think that your step 8 approach makes the same amount of sense logically, but I'm concerned that the implementation will need to either parse the ResumeTokenData in each and every ChangeStreamProxyStage::getNextBson() call or maintain some additional state in the ChangeStreamProxyStage object to keep track of when we are initializing from a "fromInvalidate" resume token.

Here's the main idea of what I'm proposing:

diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
index 7750bee..8e97ee8 100644
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -47,8 +47,13 @@ ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx,
     // pipeline construction, and use it to obtain the starting time for _latestOplogTimestamp.
     invariant(!_pipeline->getContext()->initialPostBatchResumeToken.isEmpty());
     _postBatchResumeToken = _pipeline->getContext()->initialPostBatchResumeToken.getOwned();
+    auto resumeTokenData = ResumeToken::parse(_postBatchResumeToken).getData();
+    if (resumeTokenData.fromInvalidate == ResumeTokenData::kFromInvalidate) {
+        resumeTokenData.fromInvalidate = ResumeTokenData::kNotFromInvalidate;
+        _postBatchResumeToken = ResumeToken(resumeTokenData).toDocument().toBson();
+    }
     if (!_pipeline->getContext()->needsMerge || _pipeline->getContext()->mergeByPBRT) {
-        _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
+        _latestOplogTimestamp = resumeTokenData.clusterTime;
     }
 }

This fixed the problem in my jstest, but I haven't sat down to think through any possible wrinkles yet.

Comment by Bernard Gorman [ 03/Jun/19 ]

justin.seyster, I think I know what's going on here. TLDR: if the drop event is the last entry in the oplog at the moment we startAfter the invalidate's resume token, then we will return a postBatchResumeToken to mongoS which precedes the client-supplied resume token, causing this invariant.

In SERVER-38413 (this commit), we changed the resume logic in DSShardCheckResumability such that each individual shard swallows all events in their oplog that precede the resume token, for reasons that are explained in the first comment of this code review. However, if you look at the relevant code, you will see that we do not consider the fromInvalidate field when checking the ordering of the tokens from the resumed stream against the client's resume token. This isn't fatal on its own, but it is the root of the problem. Unfortunately, this omission is intentional, and we can't resolve the issue by simply adding a check in here. A full explanation follows.

All fromInvalidate events are pseudo-events that are generated following a collection drop, rename, etc. We create them by returning the drop event and queueing up an invalidate to be returned immediately after. From the linked comment, note that the resume token of the invalidate event is almost identical to the drop which generated it:

    // The new entry will have a nearly identical resume token to the notification for the command,
    // except with an extra flag indicating that the token is from an invalidate. This flag is
    // necessary to disambiguate the two tokens, and thus preserve a total ordering on the stream.

So what's happening here is:

  1. The collection is dropped across all the shards, and each shard generates a drop event in their oplogs. Assume that the drop entry on shard A occurs at clusterTime T. Call the resume token of the drop event Rd.
  2. mongoS retrieves the drop event from shard A, and it passes through DocumentSourceCheckInvalidate en route to the client. Internally, DSCheckInvalidate queues up an invalidate event.
  3. When the client requests the next event, DSCheckInvalidate returns the invalidate with a resume token that is identical to Rd, except that it has fromInvalidate:true. Call this token Ri. Notably, it will have a clusterTime of T, just like the drop does.
  4. mongoS re-starts the stream with startAfter:Ri. Because the batchSize of the initial aggregate is 0 and we therefore don't touch the oplog, every shard will report their first postBatchResumeToken (PBRT) as Ri, the resume token they were given by the client.
  5. When mongoS issues its first getMores, the shards start running the code in DSShardCheckResumability. This examines all events at clusterTime T and swallows any events that precede the resume token. This is necessary because the operation at clusterTime T may be a transaction, and the resume token might be mid-applyOps.
  6. Because it has the same clusterTime T as the invalidate resume token Ri, DSShardCheckResumability retrieves the drop event from the oplog. It compares the drop's resume token Rd to the invalidate's resume token Ri and, because of the omitted fromInvalidate check, it believes that it has found an exact match for the resume token.
  7. The shard therefore allows the drop event to pass into the pipeline. Because we used startAfter, we ignore the first invalidating event we encounter, and so the drop passes through to ChangeStreamProxy at the end of the pipeline without queueing up an invalidate.
  8. Upon receiving this new event, ChangeStreamProxy updates its PBRT to that of the drop. In other words, this causes the current PBRT to revert from Ri back to Rd, despite the fact that Rd is earlier than Ri.
  9. Here's where the last-in-oplog factor comes into play: after processing the drop and incorrectly setting its PBRT back-in-time, ChangeStreamProxy will then attempt to pull the next entry from the stream. If there are no further entries, then ChangeStreamProxy will check whether the last observed point in the oplog is later than the clusterTime of the most recent event. If so, it will generate a high-water-mark token at that clusterTime. Therefore, if the oplog has moved past the drop event, the "poison" PBRT Rd from the drop event will be overwritten by a high-water-mark PBRT at clusterTime T+N. The PBRT returned to mongoS will therefore be later than Ri, and the stream will proceed as though nothing had happened.
  10. If the drop event is the final entry in the oplog, then ChangeStreamProxy will (correctly) not attempt to generate a high-water-mark, and will return Rd as its PBRT. We then hit the invariant described in this ticket when mongoS notices that the stream appears to have gone back-in-time.

As mentioned above, we unfortunately cannot take the simple approach of adding a fromInvalidate check in compareAgainstClientResumeToken, because this omission is intentional. We never generate a new invalidate in a stream resumed from an invalidate using startAfter, because doing so would cause the stream to be re-invalidated immediately. The exact resume token supplied by the client therefore never actually appears in the resumed stream. Since we have a strict requirement that the resumed stream should never progress past the resume token without seeing it, a strict match on the fromInvalidate field would prevent us from being able to startAfter at all. We therefore allow an invalidate resume token to match the resume token of the drop which generated it.

We also can't easily modify ChangeStreamProxy's initial resume token to remove the fromInvalidate if it is present, because then the first batch returned to the client would have a postBatchResumeToken that is earlier than the one it originally handed to mongoS. One solution would be to have ChangeStreamProxy retain the parsed ResumeTokenData from the initial PBRT if fromInvalidate is true. When we get the drop event, we can then confirm that it is identical to the initial PBRT other than the fromInvalidate field, and leave the PBRT unmodified (i.e. skip step 8 above).

Generated at Thu Feb 08 04:57:04 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.