[SERVER-29929] Report highest observed cluster time from change stream on each shard Created: 29/Jun/17 Updated: 30/Oct/23 Resolved: 19/Sep/17 |
|
| Status: | Closed |
| Project: | Core Server |
| Component/s: | Querying, Sharding |
| Affects Version/s: | None |
| Fix Version/s: | 3.6.0-rc0 |
| Type: | Task | Priority: | Major - P3 |
| Reporter: | Charlie Swanson | Assignee: | Matthew Russotto |
| Resolution: | Fixed | Votes: | 0 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Issue Links: |
|
||||||||||||
| Backwards Compatibility: | Fully Compatible | ||||||||||||
| Sprint: | Repl 2017-07-31, Repl 2017-08-21, Repl 2017-09-11, Repl 2017-10-02 | ||||||||||||
| Participants: | |||||||||||||
| Description |
|
When a change stream is initiated against a sharded cluster, merging the results can be tricky. A change stream will establish a cursor against each shard, and merge the results from those shards in order of 'clusterTime'. If each cursor returns at least one result, the _id will contain the cluster time, and can be used to merge the results and return the smallest one. However, if at least one shard returns an empty batch, we cannot yet return anything to the client, for fear that it will later return something at a smaller clusterTime. To avoid this issue, we should do the following:
In order to report this information back on each response, we will modify the capped insert notifier to keep track of the most recent optime observed. One proposal is to pass a callback to the notifier for it to use to decide when to wake the waiting thread. If we take this approach, the callback can save a reference to the OperationContext, and continually advance some sort of boost::optional<Timestamp> decoration on the OperationContext. The aggregate and getMore commands can then consult whether this is set to decide whether/what to include in this new response field. Please forgive me if I've used poor terminology, I'm pretty new to this whole 'cluster time' thing. |
| Comments |
| Comment by Githook User [ 19/Sep/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Author: {'username': 'mtrussotto', 'name': 'Matthew Russotto', 'email': 'matthew.russotto@10gen.com'}Message: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Charlie Swanson [ 10/Jul/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Misha's suggestion is an interesting one. This information is only really needed when we're merging results from tailable cursors though, since otherwise anything that comes back can be returned. Given that we only support tailable cursors on capped collections, and we don't support capped sharded collections, I'm going to start working on a POC that adds an extra argument to the response, rather than overwriting $logicalTime/$clusterTime. It shouldn't be that hard to change later if we decide to make it more general though. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Misha Tyulenev [ 03/Jul/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Another way to address it is to make afterClusterTime applicable to a data seen only by the command. I.e. in this case the scenario (2) mentioned by Charlie will work. All other assumption for the afterClasterTime will stay true because if it satisfies the collection data it also satisfies the node-wide operations. The benefits are:
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Spencer Brody (Inactive) [ 03/Jul/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Yeah, I think the overall approach of this ticket makes sense. Some of the implementation details are a little foggy, for instance I don't really understand the change you are proposing making to the capped insert notifier, but I think the goal of what we're trying to achieve sounds right. It will be important to make sure this plays well with readConcern:majority - the new value we add to the cursor responses must be the last committed timestamp we observed, otherwise we risk having an earlier timestamp generated later, after a rollback. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Charlie Swanson [ 03/Jul/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
On a separate note: I don't think we're very interested in changing the tailable + awaitData behavior of not reporting an error when exceeding maxTimeMS. If there are no writes to one shard in system, we don't want to end up closing that cursor, we just want to keep waiting for changes. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Charlie Swanson [ 03/Jul/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
After discussing with misha.tyulenev, I'm now even more convinced this is necessary. Suppose we were merging results from two shards, and we got back these responses:
It seems okay to return the document from the first shard, since the second shard has a operationTime at least as great as the timestamp of the first document. But the second shard might actually have a matching operation at time (100, 5) that it didn't report because the cursor didn't see it. For example, the oplog on the second shard might look like this:
If this is the case, it would be incorrect to return the operation with time (100, 6), since the operation on the second shard with time (100, 5) would come first. There is currently nothing in the response (other than an empty batch) that would indicate there could be a missing result. In order to correctly merge the response, we need a guarantee that the cursor on the second shard will not return anything before time (100, 6). To get this, we could do one of the following:
Option 2 sounds appealing because piggy-backing off of the 'readAfterClusterTime' behavior would avoid any custom logic. Unfortunately, spencer informs me that we do not generate such no-op writes unless there are no writes on that node, so we could run into problems if the particular collection we're interested in is idle, but the node has lots of traffic on other collections. So then I think we're left with either generating no-op writes in certain scenarios (figuring out when seems non-trivial...), or reporting back this extra piece of metadata in the response. I think our best option is to add this metadata to the response, which would give the mongos merging logic a useful piece of information without generating or relying on writes to the collection of interest. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Charlie Swanson [ 30/Jun/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
schwerin we thought about that, it almost works. The trouble comes when your cursor returns an empty batch not because it read all the way through the oplog and nothing matched, but rather because it read as much as it could in the given maxTimeMS and nothing matched. In this scenario, you don't actually know that there's nothing before the operation time returned from the command, since you haven't finished reading the oplog. For example, suppose the oplog contained:
I believe the reported operation time would be (104, 5), correct? If our getMore was scanning from behind, it might examine (98, 1) and (99, 0), then realize it's exceeded its time limit and return an empty batch. Another option would be to return 'exceededTimeLimit' in this scenario, which we do for normal queries, but don't do for tailable awaitData queries:
I'm not sure if this special awaitData behavior was intentional, but it doesn't seem very intuitive... | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Andy Schwerin [ 30/Jun/17 ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Can we somehow use the logical clock for this? I feel like the operation time returned on each command (not the cluster time, but the operation time) can be used to provide this guarantee. Check with misha. |