Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-98054

Change Streams throughput does not scale with the number of shards on a sharded cluster

    • Type: Icon: Improvement Improvement
    • Resolution: Unresolved
    • Priority: Icon: Minor - P4 Minor - P4
    • None
    • Affects Version/s: None
    • Component/s: None
    • None
    • Query Execution
    • QE 2025-02-03, QE 2025-02-17, QE 2025-03-03, QE 2025-03-17

      Test scenario 

      1. A large set of documents is inserted into a sharded collection. The documents are approximately equally distributed across the shards (2 shards).
      2. All document insert associated change events are fetched.

      A trace of events in the system

      Event trace notation

      Notation of trace lines:

      <event#> [<node role>] <event description>

      Two additional spaces are used after <event#> to indicate that the event is triggered by a preceding event without indentation.

      Time flows from top to bottom.

      Entities and communication

      mongos process communicates with shard A, shard B, shard cfg (the config server); the communication is asynchronous.

      The event trace

      01 [mongos] receives getMore command request
      02   [shard A] receives getMore command request
      03   [shard B] receives getMore command request
      04   [shard cfg] receives getMore command request
      05   [shard B] returns 20304 events in 560ms
      06   [shard A] returns 20306 events in 580ms
      07 [mongos] returns 0 events in 1000ms
      08 [shard cfg] returns 0 events in 1000ms
      09 [mongos] receives getMore command request
      10 [mongos] returns 27685 events in 173ms
      11 [mongos] receives getMore command request
      12 [mongos] returns 12360 events in 69ms
      13 [mongos] receives getMore command request
      14   [shard A] receives getMore command request
      15   [shard cfg] receives getMore command request
      16   [shard A] returns 20226 events in 476ms
      17 [mongos] returns 1182 events in 489ms
      18 [mongos] receives getMore command request
      19   [shard B] receives getMore command request
      20   [shard cfg] receives getMore command request
      21   [shard B] returns 20226 events in 468ms
      22   [shard cfg] returns 0 events in 1000ms
      23 [mongos] returns 27594 events in 630ms
      24 [mongos] receives getMore command request
      25 [mongos] returns 11512 events in 65ms
      26 [mongos] receives getMore command request
      27   [shard A] receives getMore command request
      28   [shard cfg] receives getMore command request
      29   [shard A] returns 19883 events in 447ms
      30 [mongos] returns 1410 events in 460ms
      

      Observations

      1. After the initial phase of event fetching from data shards marked by event# 01 - 07, mongos process cycles between shard A and shard B in order to fetch the next batch of events and waits for a shard to return data before it can return a batch of events to the client. For example, before event#17 mongos waits for shard A to return a batch of events. Furthermore in this event mongos does not return the full batch of events. This happens because a buffer of events from shard B got exhausted before the batch could be filled fully. The emerging property of this design is that at maximum only one data shard fetches data at a time and the resulting throughput of the change stream is even worse than that of a replica set (due to additional overheads here and there). The explanation why this happens is somewhat obvious - only one shard at a time is likely to run out of events in the buffer on mongos, and the new batch of events is requested only for that shard.
      2. Event sequence 11-12 is an example of another unexpected behavior which demonstrates another lost opportunity to benefit from the concurrency available in the system - mongos returns not a full batch of events without (a) requesting data from the shards; and (b) without waiting for specified or default timeout.
      3. change_stream_listen_throughput performance test demonstrates that change stream throughput is lower on a sharded cluster than on a replica set.

      Conclusion

      The change streams throughput can be improved by fetching change events from the data shards more eagerly and leveraging all concurrency available in the system.

            Assignee:
            jan.steemann@mongodb.com Jan Steemann
            Reporter:
            mindaugas.malinauskas@mongodb.com Mindaugas Malinauskas
            Votes:
            2 Vote for this issue
            Watchers:
            11 Start watching this issue

              Created:
              Updated: