Uploaded image for project: 'Node Driver'
  1. Node Driver
  2. NODE-1472

Inconsistent Cursor.map() Behavior with asyncIterator on Streams

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major - P3
    • Resolution: Unresolved
    • Affects Version/s: 3.1.0
    • Fix Version/s: None
    • Component/s: native
    • Labels:
      None

      Description

      I see an inconsistency in cursor and stream handling revolving around the Cursor.map() function that becomes evident when dealing with asyncIterator within Readable streams.

      Something odd is happening with content available in the document when iterating this way, and it seems to manifest consistently with the Array.prototype.reduce() not being recognized as present despite the data being consistently shown as an array. The following listing was used to test:

       

      const { MongoClient } = require('mongodb');
       
      const uri = 'mongodb://localhost';
       
      const log = data => console.log(JSON.stringify(data, undefined, 2));
       
      (async function() { 
       
        try {
          const client = await MongoClient.connect(uri);
          let db = client.db('test');
       
          let collection = db.collection('collection');
          await collection.deleteMany();
          await collection.insertOne({
            "a": 1,
            "b": [{ "k": 1, "v": 2 },{ "k": 2, "v": 3 }]
          });
       
          const altCursor = () =>
            collection.find()
              .stream({ transform: d =>
                ({ ...d, b: d.b.reduce((o,{ k, v }) => ({ ...o, [k]: v }),{}) })
              });
       
          const getCursor = () =>
            collection.find()
              .map(d =>
                ({ ...d, b: d.b.reduce((o,{ k, v }) => ({ ...o, [k]: v }),{}) })
              )
              .stream();
       
          // Ignores stream transform
          try {
            //let cursor = altCursor();
            let cursor = getCursor();
            while (await cursor.hasNext()) {
              let doc = await cursor.next();
              log(doc);
             }
          } catch(e) {
            console.error(e)
          }
       
          // Ignores stream transform
          try {
            //let cursor = altCursor();
            let cursor = getCursor();
            log( await cursor.toArray() );
          } catch(e) {
            console.error(e)
          }
       
          // Errors on standard cursor, stream transform okay
          try {
            let cursor = altCursor();
            //let cursor = getCursor();
            for await ( let doc of cursor ) {
              log(doc);
            }
          } catch(e) {
            console.error(e)
          }
       
          // map on an array however is fine
          let cursor = collection.find().map(({ b, ...d }) => ({
            ...d,
            b: b.map(e => ({ ...e, hi: 'there' }))
          }));
       
          for await ( let doc of cursor ) {
            log(doc);
          }
       
          console.log('done');
          await client.close();
        } catch(e) {
          console.error(e)
        } finally {
          process.exit()
        }})()
      

      The issue I consistently see is that the Cursor.map() using reduce() will consistently fail when applied in a for..await..of style iterator. The error in fact implies that the "array" is not actually there:

      /mongodb/lib/utils.js:132
            throw err;
            ^
       
      TypeError: d.b.reduce is not a function
      
      

      If you try to "force" the casting via Array.from() then any call to reduce() treats the array as being empty. However as the listing demonstrates, the Array.prototype.map() function does not have the same issue.

      The other slight oddity is that with a transform option on the stream instead of using Cursor.map() then the for..await..of style using Symbol.asyncIterator on the Readable stream works just as expected. That transform however is notably ignored by calling methods directly such as Cursor.next() or any derivative that makes that call, and yet having next() is required for the Symbol.asyncIterator compliance.

      Placing both the Cursor.map() and Cursor.stream() options including a transform function works as expected with the direct Cursor.next() which essentially ignores the transform, however it comes across the same problem when calling reduce() on the stream.

      It's unclear where exactly the problem is occurring apart from something very inconsistent is happening with Cursor.map()

      And the expected output from the supported options is as shown. Noting this will only output this way by not using Cursor.map() with the Array.reduce() on the call with for..await..of, but the last output shows Array.map() within Cursor.map() works as expected on the final output always:

       
      {
        "_id": "5b01463d3067111b0080b869",
        "a": 1,
        "b": {
          "1": 2,
          "2": 3
        }
      }
      [
        {
          "_id": "5b01463d3067111b0080b869",
          "a": 1,
          "b": {
            "1": 2,
            "2": 3
          }
        }
      ]
      {
        "_id": "5b01463d3067111b0080b869",
        "a": 1,
        "b": {
          "1": 2,
          "2": 3
        }
      }
      {
        "_id": "5b01463d3067111b0080b869",
        "a": 1,
        "b": [
          {
            "k": 1,
            "v": 2,
            "hi": "there"
          },
          {
            "k": 2,
            "v": 3,
            "hi": "there"
          }
        ]
      }
       
      
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                neillunn Neil Lunn
                Participants:
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated: