When a change stream is initiated against a sharded cluster, merging the results can be tricky. A change stream will establish a cursor against each shard, and merge the results from those shards in order of 'clusterTime'. If each cursor returns at least one result, the _id will contain the cluster time, and can be used to merge the results and return the smallest one. However, if at least one shard returns an empty batch, we cannot yet return anything to the client, for fear that it will later return something at a smaller clusterTime. To avoid this issue, we should do the following:
- To both the aggregation response, and the response of each getMore, add a new internal field called something like _internalLatestClusterTimeObserved to the cursor section of the response. This will report the highest cluster time that the cursor has observed, even if it didn't match the criteria and thus was not returned.
- This value can be treated as a guarantee from the shard that it will never return data before that cluster time on that cursor.
- The logic merging cursors on mongos can then return any results that have a cluster time smaller than the minimum reported by all cursors.
- The merging logic can then safely run a getMore using a 'read after cluster time' with it's desired time to advance a cursors guaranteed-minimum-optime. However, it still needs to inspect the results of this response, since it's possible the cursor will still return a smaller optime if it exceeded its maxTimeMS.
In order to report this information back on each response, we will modify the capped insert notifier to keep track of the most recent optime observed. One proposal is to pass a callback to the notifier for it to use to decide when to wake the waiting thread. If we take this approach, the callback can save a reference to the OperationContext, and continually advance some sort of boost::optional<Timestamp> decoration on the OperationContext. The aggregate and getMore commands can then consult whether this is set to decide whether/what to include in this new response field.
Please forgive me if I've used poor terminology, I'm pretty new to this whole 'cluster time' thing.
- is depended on by
SERVER-29141 Add sharding support for merging changeNotification responses from all shards