-
Type:
Bug
-
Status: Closed
-
Priority:
Major - P3
-
Resolution: Fixed
-
Affects Version/s: 1.3.1, 2.0
-
Fix Version/s: 2.2
-
Component/s: None
-
Labels:None
I want to get all the documents matching a query that are currently in a collection and then start watching for any new documents matching the same query that are inserted from that moment on.
It's critical that I prevent a race condition that would cause me to miss any documents that are inserted/updated between the find()/aggregate() and the watch() commands.
This is straightforward to implement with pymongo:
import pymongo
|
client = pymongo.MongoClient()
|
client.testdb.drop_collection('foo')
|
coll = client.testdb.foocoll.insert_one({'foo': 1})
|
with coll.watch() as stream:
|
for doc in coll.find():
|
print(doc)
|
coll.insert_one({'bar': 1})
|
for event in stream:
|
print(event)
|
Output:
{'_id': ObjectId('5c8a558f22f3390f5b030219'), 'foo': 1}
|
{'_id': {'_data': '825C8A55900000000129295A1004F96DAC4F71B04A9E9C732D2ED1EA1E9946645F696400645C8A559022F3390F5B03021A0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1552569744, 1), 'fullDocument': {'_id': ObjectId('5c8a559022f3390f5b03021a'), 'bar': 1}, 'ns': {'db': 'testdb', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5c8a559022f3390f5b03021a')}}
|
The same with Motor 2.0 however misses the event:
import motor.motor_asyncio
|
client = motor.motor_asyncio.AsyncIOMotorClient()
|
await client.testdb.drop_collection('foo')
|
coll = client.testdb.foo
|
await coll.insert_one({'foo': 1})
|
async with coll.watch() as stream:
|
async for doc in coll.find():
|
print(doc)
|
await coll.insert_one({'bar': 1})
|
async for doc in stream:
|
print(doc)
|
Output:
{'_id': ObjectId('5c8a559822f3390f5b03021c'), 'foo': 1}
|
There is a workaround, albeit cumbersome:
import asyncio
|
import bson
|
import motor.motor_asyncio
|
|
client = motor.motor_asyncio.AsyncIOMotorClient()
|
await client.testdb.drop_collection('foo')
|
coll = client.testdb.foo
|
await coll.insert_one({'foo': 1})
|
await asyncio.sleep(1) . # make sure we don't accidentally start watching before the drop
|
|
server_status = await client.testdb.command({'serverStatus': 1})
|
ts = bson.Timestamp(server_status['localTime'], 0)
|
seen = set()
|
async for doc in coll.find():
|
seen.add(doc['_id'])
|
print(doc)
|
await coll.insert_one({'bar': 1})
|
async with coll.watch(start_at_operation_time=ts) as stream:
|
async for event in stream:
|
if event['fullDocument']['_id'] not in seen:
|
print(event)
|
Output:
{'_id': ObjectId('5c8a58ca22f33910561ecbaa'), 'foo': 1}
|
{'_id': {'_data': '825C8A58CB0000000129295A1004A1D923FC5DFB453D808A3B410DE286C246645F696400645C8A58CB22F33910561ECBAB0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1552570571, 1), 'fullDocument': {'_id': ObjectId('5c8a58cb22f33910561ecbab'), 'bar': 1}, 'ns': {'db': 'testdb', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5c8a58cb22f33910561ecbab')}}
|