[SERVER-71565] Resuming a change stream with a $match filter which excludes the resume point fails Created: 23/Nov/22  Updated: 14/Aug/23

Status: Backlog
Project: Core Server
Component/s: None
Affects Version/s: 6.0.3
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Eric Daniels Assignee: Backlog - Query Execution
Resolution: Unresolved Votes: 0
Labels: changestreams
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Attachments: File watcher.js     File watcher_workaround1.js     File writer.js     File writer2.js    
Issue Links:
Related
related to SERVER-67677 Resuming a change stream with a resum... Backlog
Assigned Teams:
Query Execution
Operating System: ALL
Participants:

 Description   

So I just hit this when upgrading from 4.4.18 to 6.0.3. I believe that this is either a regression or an undefined behavior that was fixed but broke backwards compatibility. In my scenario, we are waiting for calls to "robots". Once we receive a call to a robot based on its name (an insert), we latch onto the call document and wait for future updates in a more select change stream. In order to prevent races, we use the resume token from the original change stream. What ends up happening is that the server thinks some resume token no longer exists. What appears to actually be happening is that in 6.0, the server actually increments the cluster time ordinal by one from the token passed in. Depending on the amount of writes to the oplog, there's a chance (especially in testing) that that token does not exist yet, resulting in an error. If a single write happens after the first insert that causes that ordinal incremented cluster time to exist, no error happens, but we miss the next update event which does not appear to make sense (and feels like a bug). Attached are two sets of reproduction scripts.

Repro 1:

The first script to execute, watcher.js, waits for a document to be inserted and then starts up a new change stream waiting for the next update to that inserted document using the resume token to not miss any updates. Then execute writer.js which will insert a document and then update it. You should see an error like:

{
  _id: {
    _data: '82637D61C1000000012B022C0100296E5A1004C28754200E80474C90E2EAA34447212546645F69640064637D61C184E895993F2B1C840004'
  },
  operationType: 'insert',
  clusterTime: Timestamp({ t: 1669161409, i: 1 }),
  wallTime: ISODate("2022-11-22T23:56:49.618Z"),
  fullDocument: { _id: ObjectId("637d61c184e895993f2b1c84"), host: 'host1' },
  ns: { db: 'data', coll: 'stuff' },
  documentKey: { _id: ObjectId("637d61c184e895993f2b1c84") }
}
MongoServerError: Executor error during getMore :: caused by :: cannot resume stream; the resume token was not found. {_data: "82637D61C1000000022B022C0100296E5A1004C28754200E80474C90E2EAA34447212546645F69640064637D61C184E895993F2B1C840004"}

If you inspect the tokens, you'll see that they are off by one in the cluster time ordinal.

> decodeResumeToken('82637D61C1000000012B022C0100296E5A1004C28754200E80474C90E2EAA34447212546645F69640064637D61C184E895993F2B1C840004')
{
  timestamp: new Timestamp({ t: 1669161409, i: 1 }),
  version: 1,
  tokenType: 128,
  txnOpIndex: 0,
  fromInvalidate: false,
  uuid: new UUID("c2875420-0e80-474c-90e2-eaa344472125"),
  documentKey: { _id: new ObjectId("637d61c184e895993f2b1c84") }
}
> decodeResumeToken('82637D61C1000000022B022C0100296E5A1004C28754200E80474C90E2EAA34447212546645F69640064637D61C184E895993F2B1C840004')
{
  timestamp: new Timestamp({ t: 1669161409, i: 2 }),
  version: 1,
  tokenType: 128,
  txnOpIndex: 0,
  fromInvalidate: false,
  uuid: new UUID("c2875420-0e80-474c-90e2-eaa344472125"),
  documentKey: { _id: new ObjectId("637d61c184e895993f2b1c84") }
}

Repro 2:

Run watcher.js again but this time run writer2.js which will do an insert of another document before the update. In this case, watcher.js just hangs forever which I'd expect to not happen which again leads me to believe something fishy is happening.

This also works fine on 5.0.14. Presently, this issue is preventing us from upgrading our Atlas cluster to 6.0.3.

It's possible this is related to SERVER-67677 since



 Comments   
Comment by Eric Daniels [ 05/Dec/22 ]

Ah thanks very much for that note on the transaction caveat with my use of incrementing the time! Also thank you for the explanation about the underlying issue, it's very helpful. Hopefully it's not a pain to fix at this point! Happy holidays!

Comment by Bernard Gorman [ 05/Dec/22 ]

Hey eric@erdaniels.com! I believe the issue you're seeing here is because, on 6.0, we push down and rewrite user filters on a $changeStream pipeline into the oplog wherever possible. As such, when you issue the second stream that filters only for update events but provide a resume token that refers to an insert event, the insert is filtered out of the stream at the level of the oplog scan, meaning that the DSCSEnsureResumeTokenPresent stage never sees it.

In 5.0 and earlier, this was not the case; filters were applied only after all change stream stages had processed each event on the collection. As a result, a resume token could be used with any arbitrary pipeline and would still work. This was never an intended feature of change streams, but rather an incidental side-effect of implementation details. Pushing down filters into the oplog confers massive performance improvements in cases where we can do so, but it also strengthens the requirement that the resumed stream must have a filter that is at least as broad as that on the original stream. This is different than the issue described in SERVER-67677, where resuming with the same pipeline fails because the updateLookup document has changed in the meantime, and no longer matches that pipeline.

To work around this, as you noted above, you can use startAtOperationTime with the clusterTime of the event. I would advise against incrementing the low bits of the timestamp, since if the insert and update occur within a transaction (and thus share the same clusterTime) this will cause your stream to miss the update. Starting at the same clusterTime and filtering for update with documentKey._id should be enough.

fails with the first work around when/if the first changestream doesn't return an event and instead we find an event via a FindOne (for performance reasons in our call queue). When that case happens, we don't have a change event to look at to pull out cluster time but we know that we need to only resume from the point where a change stream started so that we don't miss any updates.

In this case, you can still obtain the time at which the change stream started directly from the stream itself. Run the initial change stream with a batchSize of 0; this will return no events, but by calling getResumeToken on the cursor you'll obtain the HWM token corresponding to the start point. Then iterate the stream as normal. If you don't see any insert events, do a findOne to retrieve the document and then start the update-monitoring stream using resumeAfter with the HWM you recorded at the start.

Hope this helps!

Best regards,
Bernard

Comment by Eric Daniels [ 28/Nov/22 ]

Thanks Kelsey! All is well; hope you're good too

Comment by Kelsey Schubert [ 28/Nov/22 ]

Thanks for the report eric@erdaniels.com; I hope you're doing well. I'm assigning this to the query team to investigate.

Comment by Eric Daniels [ 23/Nov/22 ]

So after more experimentation, particularly in Atlas, the code in https://github.com/viamrobotics/goutils/blob/main/rpc/wrtc_call_queue_mongodb.go#L296 fails with the first work around when/if the first changestream doesn't return an event and instead we find an event via a FindOne (for performance reasons in our call queue). When that case happens, we don't have a change event to look at to pull out cluster time but we know that we need to only resume from the point where a change stream started so that we don't miss any updates. One option around this would to be to use very low level usage of the go driver but that doesn't feel very safe/reliable. That would lead us to the second workaround which is to turn the event token into a HWM token but it's not really feasible without porting some token serialization code over and again, feels discouraged and risks compatibility issues. We will hold off on an upgrade for now.

Comment by Eric Daniels [ 23/Nov/22 ]

I cannot edit the original post, but for repro 2, I updated the script and it just has the same error, that's my bad. No fishy behavior on repro 2.

In terms of workarounds:
1. (works) Using the clusterTime of the event, increment the low bits by 1 so that you don't look for a token but instead where you know you want to leave off at. You can use _

{ startAtOperationTime: new Timestamp(firstTS.getHighBits(), firstTS.getLowBits() + 1) }

_. See watcher_workaround1.js.
2. I'm curious (and fairly certain) if you could create a high watermark token at the events cluster time instead of it being an event token (depends on the actual issue here). This would let you still use resumeAfter but is hard to access myself.

Thinking about this more, I think the bug is that for some reason the server is incrementing the event token by one and this event could never possible exist in my scenario.

Due to the workarounds, we can proceed with our upgrade but I believe this is still a fairly impactful compatibility issue that is also subtle.

Generated at Thu Feb 08 06:19:22 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.