Uploaded image for project: 'Node.js Driver'
  1. Node.js Driver
  2. NODE-2662

The data that pipes through a changeStream is duplicated and compounded exponentially upon refreshing the browser connected to an SSE event

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 3.5.9
    • Component/s: None
    • Labels:

      When I watch a collection using the code below I will return a data change as expected. The return of data is the sent to an SSE event that forwards it to the client. 

      The problem is that when I refresh the browser and it is a match to how many times I refresh the browser the pipeline of the watch event just loads up those refresh counts and will emit them on any data change. 

      What gets emitted? The duplication or repetition of the same data event. So it's like a gun being loaded up and then firing all the bullets at once. If refresh again. The 5 previous refreshes will turn into 10 firings of the data emitting.  

      This seems like a very bad memory leak to me. At any time I expect there to be only a single data emission into the watch event via .on('message', next => 

      The way I am seeing it is in 2 cases both through console logs via the client and the server. So it is piping those data emissions through. 

       

      Here is a code sample of what I am using to watch the connection. 

      ```javascript
      const testthis = async (client, req) => {
          const pipeline = [
              // { $match: 

      { 'fullDocument.hashId': req.hashId }

       },
              { $addFields: 

      { newField: 'this is an added field!' }

       }
          ];
          console.log('BREAKER BREAKER ########################################', req.hashId)
          const collection = client.collection(documentName);
          const changeStreamIterator = await collection.watch(pipeline);

          changeStreamIterator.on('change', next => {
              console.log('THIS IS NEXT ============================== ', next.fullDocument);
              subject.next(next.fullDocument)
      ```
       
      ```
      const initialize = (req, res) => {
          res.setHeader('Content-Type', 'text/event-stream');
          res.setHeader('Cache-Control', 'no-cache');
          res.setHeader('Connection', 'keep-alive');
          res.flushHeaders(); // flush the headers to establish SSE with client
          res.write('retry: 5000\n');
          console.log('req.userData ', req.userData);
          const channelStream = req.params.channel;
          client.connect( err => {
              console.log('BREAKER')
              const collection = client.db(dbName);
              if (!err) 

      {             console.log("Connected successfully to server.");         }

       else 

      {             console.log('Error in DB connection : ', JSON.stringify(err, undefined, 2));         }

              let count = 0;
              if (count < 3) 

      {             testthis(collection, 'req.userData');             count++         }

              console.log('count', count);
              // testthis(collection, 'req.userData');
             
              // console.log('result 777 = ', result);
          });
          let messageId = 1;
          subject.subscribe({
              next: (v) => {
                  
                  console.log(`observerA: ${JSON.stringify(v)}`);
                  res.write(`event: ${channelStream}\n`);
                  res.write(`data: ${JSON.stringify(v)}\nid: ${messageId}\n\n\n`); // res.write() instead of res.send()
      ```
      StackOverflow Post

            Assignee:
            thomas.reggi@mongodb.com Thomas Reggi (Inactive)
            Reporter:
            xtianus@live.com Christian Holes
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: