Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-47460

The dispatching and merging policy about change stream in MongoS

    • Type: Icon: Question Question
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: Querying
    • Labels:
      None

      I'm very interested in the change stream, so I go through some documentation posts on the official website. But there is no document introduce the change stream inner details especially on sharding, so I read the source code in `cluster_aggregate.cpp` starts from `runAggregate` function in v4.0. However, I'm not quite understanding the dispatching and merging policy details. Hope to get help here.

      Here come my basic understandings about change streams in sharding, please let me know if I'm wrong:
      1. Users send change stream command by collection/db/global watch, the driver will parse the `watch` to `aggregate` command with the first stage is $changestream.
      2. MongoS receives the command and dispatches the command to all shards no matter whether there is a corresponding db/collection on the shard.
      3. MongoD receives the modified `aggregate` command and then runs two steps: $match and transform. $match stage does filter the given oplog by oplog cursor; transform stage then converts the oplog to change stream event.
      4. MongoS receives the cursor responses and then runs the merging policy.

      I've some questions about the dispatching and merging policy:
      1. Since v3.6, MongoDB uses the logical time as timestamp which keeps the causal consistency. So different MongoS may have a different timestamp. In the change stream, MongoS uses local logical time if no `afterClusterTime` options given, will this cause some data loss? For example, mongos1 timestamp is 10:00, mongos2 is 10:02, shard1 is 09:59, shard2 is 10:01, if users send change stream command to mongos1, then mongos1 will send the aggregate command to each shard begin with 10:00, and the shard1 oplog from 09:59~10:00 will lost. I also find a jira SERVER-31767. It means this problem has been solved since v4.1.1 by global point time, right?
      2. About the merging step, mongos will merge different cursors into one before returning back to the user. The merge policy is by the resume token which includes: clusterTime, documentKey(shard key), UUID. Is there any "wait policy" in mongos before sort the change stream events? For example, shard1 returns events with ts=05:10, will mongos returns it to users immediately? or it will wait for all oplogs from other shards older than 05:10 to be received before replying to the user? If not, the causal consistency of move chunk can not be guaranteed.

            Assignee:
            Unassigned Unassigned
            Reporter:
            cvinllen@gmail.com vinllen chen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: