[SERVER-48694] Push down user-defined stages in a change stream pipeline where possible Created: 10/Jun/20 Updated: 29/Oct/23 Resolved: 11/Oct/21 |
|
| Status: | Closed |
| Project: | Core Server |
| Component/s: | None |
| Affects Version/s: | None |
| Fix Version/s: | 5.1.0-rc0 |
| Type: | Improvement | Priority: | Major - P3 |
| Reporter: | Vinicius Grippa | Assignee: | Backlog - Query Execution |
| Resolution: | Fixed | Votes: | 1 |
| Labels: | change-streams-improvements, qexec-team | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Issue Links: |
|
||||||||||||
| Assigned Teams: |
Query Execution
|
||||||||||||
| Backwards Compatibility: | Fully Compatible | ||||||||||||
| Sprint: | Query 2020-06-29 | ||||||||||||
| Participants: | |||||||||||||
| Description |
|
Hi, Testing the change stream we observed data sent as such from mongoD to mongoS. (barring a constant 50% reduction that we have attributed to mongo transport compression). To further strengthen this analysis, we get exactly the same data transfer per entry during watch operations that use filters and the ones that do not. This leads us to to the conclusion that the projection and match stage is only happening at mongoS level leading to high network usage. Also, the change stream reported on the mongoS log does not show the stages:
If we compare with the mongoD:
Where we can observe the $match and $project stages on mongoD. They are using the same cursor:
Can you please clarify? It is not possible to extract a explain() from the change stream and documentation is not clear about it. Only with tests and code analysis is it possible to infer these results. |
| Comments |
| Comment by Bernard Gorman [ 11/Oct/21 ] | |||||||||||||||||||||||||||||||||||||||
|
As of release 5.1.0, $match and $project stages will be pushed down from mongoS to the shards where possible. In addition, $match filters will be rewritten such that they can be applied directly to the oplog, for greater efficiency in eliminating non-matching events. | |||||||||||||||||||||||||||||||||||||||
| Comment by Vinicius Grippa [ 04/Jul/20 ] | |||||||||||||||||||||||||||||||||||||||
|
Bernard, Thanks so much for the detailed explanation. I really appreciate the details that you shared and your deep level of knowledge. You just got a fan. Have a great weekend. | |||||||||||||||||||||||||||||||||||||||
| Comment by Bernard Gorman [ 02/Jul/20 ] | |||||||||||||||||||||||||||||||||||||||
|
Your observation is correct - in cases where the user specifies additional pipeline stages, those stages are run on mongoS rather than on mongoD. The reason for this is that the $changeStream stage is actually an alias for a series of internal stages which manage the data stream; in order to do so, each of these stages must see all events generated by the stream, as well as any internal events generated by one of the preceding stages. In a sharded cluster, some of these internal stages must run on mongoS; as a result, we can only apply user-defined filters and projections on mongoS, after the internal stages have had a chance to digest the stream. Pushing down the user's $match or $project stages to the shards would prevent some vital information from being reported to the mongoS at all. One such case, for instance, concerns stream invalidation. When a $changeStream running on any shard sees a collection drop, it will create an internal event with {operationType: "invalidate"}. The mongoS needs to see this event, so it can take action - in this case, by closing the stream and cleaning up all the cursors on the shards. But say the user has added a stage {$match: {operationType: "insert"}} to the pipeline, which is a perfectly reasonable request to filter out any events that are not of type insert. If we push that down to the shards, then all events of type invalidate will be filtered from the stream before mongoS has a chance to see them. The result is that the stream will never be invalidated, which violates one of change streams' guaranteed behaviours. While this design was imposed on us due to a variety of constraints during the original implementation of change streams, we now believe that we have reached a point where we can address this issue more thoroughly. I have renamed this ticket in order to better describe the desired outcome, and we will consider this work for scheduling in a future sprint. One other point regarding explain output: while we do not display each of the individual $changeStream stages in our explain output, the output does indeed show you where the user-defined stages are running. Here is the explain output from a 3.6 mongoS running a $changeStream with an additional user-defined $match stage:
As you can see from the above, the shardsPart field indicates that only the $changeStream is dispatched to the shards; the $match is executed in the $mergerPart pipeline, which per the mergeType field is running on mongos. The $sort stage in the $mergerPart indicates that the mongoS is merge-sorting the results from the shards before running the consolidated stream through the user's $match. I hope the information I've provided above is useful. Thanks again for bringing this issue to our attention, and for your interest in change streams! Best regards, | |||||||||||||||||||||||||||||||||||||||
| Comment by Vinicius Grippa [ 10/Jun/20 ] | |||||||||||||||||||||||||||||||||||||||
|
I forgot to mention. This is MongoDB 3.6 version. |