Uploaded image for project: 'Motor'
  1. Motor
  2. MOTOR-320

Motor change stream misses events compared to equivalent pymongo code

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 2.2
    • Affects Version/s: 1.3.1, 2.0
    • Component/s: None
    • Labels:
      None

      I want to get all the documents matching a query that are currently in a collection and then start watching for any new documents matching the same query that are inserted from that moment on.
      It's critical that I prevent a race condition that would cause me to miss any documents that are inserted/updated between the find()/aggregate() and the watch() commands.

      This is straightforward to implement with pymongo:

      import pymongo
      client = pymongo.MongoClient()
      client.testdb.drop_collection('foo')
      coll = client.testdb.foocoll.insert_one({'foo': 1})
      with coll.watch() as stream:
          for doc in coll.find():
              print(doc)
          coll.insert_one({'bar': 1})
          for event in stream:
              print(event)
      

      Output:

      {'_id': ObjectId('5c8a558f22f3390f5b030219'), 'foo': 1}
      {'_id': {'_data': '825C8A55900000000129295A1004F96DAC4F71B04A9E9C732D2ED1EA1E9946645F696400645C8A559022F3390F5B03021A0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1552569744, 1), 'fullDocument': {'_id': ObjectId('5c8a559022f3390f5b03021a'), 'bar': 1}, 'ns': {'db': 'testdb', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5c8a559022f3390f5b03021a')}}
      

      The same with Motor 2.0 however misses the event:

      import motor.motor_asyncio
      client = motor.motor_asyncio.AsyncIOMotorClient()
      await client.testdb.drop_collection('foo')
      coll = client.testdb.foo
      await coll.insert_one({'foo': 1})
      async with coll.watch() as stream:
          async for doc in coll.find():
              print(doc)
          await coll.insert_one({'bar': 1})
          async for doc in stream:
              print(doc)
      

      Output:

      {'_id': ObjectId('5c8a559822f3390f5b03021c'), 'foo': 1}
      

      There is a workaround, albeit cumbersome:

      import asyncio
      import bson
      import motor.motor_asyncio
      
      client = motor.motor_asyncio.AsyncIOMotorClient()
      await client.testdb.drop_collection('foo')
      coll = client.testdb.foo
      await coll.insert_one({'foo': 1})
      await asyncio.sleep(1) . # make sure we don't accidentally start watching before the drop
      
      server_status = await client.testdb.command({'serverStatus': 1})
      ts = bson.Timestamp(server_status['localTime'], 0)
      seen = set()
      async for doc in coll.find():
          seen.add(doc['_id'])
          print(doc)
      await coll.insert_one({'bar': 1})
      async with coll.watch(start_at_operation_time=ts) as stream:  
          async for event in stream:
              if event['fullDocument']['_id'] not in seen:
                  print(event)
      

      Output:

      {'_id': ObjectId('5c8a58ca22f33910561ecbaa'), 'foo': 1}
      {'_id': {'_data': '825C8A58CB0000000129295A1004A1D923FC5DFB453D808A3B410DE286C246645F696400645C8A58CB22F33910561ECBAB0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1552570571, 1), 'fullDocument': {'_id': ObjectId('5c8a58cb22f33910561ecbab'), 'bar': 1}, 'ns': {'db': 'testdb', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5c8a58cb22f33910561ecbab')}}
      

            Assignee:
            shane.harvey@mongodb.com Shane Harvey
            Reporter:
            gimperiale Guido Imperiale
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: