Details
Description
As part of DRIVERS-1995 there is a specification test called Test new structure in ns document MUST NOT err.
The test passes the Python Driver for all cases except when a sharded cluster is used in the latest and rapid releases.
I verified this locally by:
- Installing mongo-mongo_release_macos_92d2be50b60d8fcf0114eacfcce0b79791aa4d9d_22_04_11_15_24_04
- Running a sharded cluster using "mlaunch init --binarypath $MONGODB_BIN --sharded 3 --replicaset --name repl0 --port $PORT --hostname localhost --setParameter enableTestCommands=1"
- Running python 3.10 and pymongo 4.1.0 in one shell with the following code:
from pymongo import MongoClient
|
c = MongoClient()
|
db = c.test
|
pipeline = [ { "$project": { "operationType": "insert", "ns.viewOn": "db.coll" } } ]
|
cursor = db.inventory.watch(pipeline=pipeline)
|
document = next(cursor)
|
- Running the following in another terminal window:
from pymongo import MongoClient
|
c = MongoClient()
|
db = c.test
|
db.inventory.insert_one({"username": "bob"})
|
- Observing the following error in the original window:
--------------------------------------------------------------------------- |
OperationFailure Traceback (most recent call last)
|
Input In [7], in <cell line: 1>() |
----> 1 document = next(cursor) |
|
|
File ~/pymongo/change_stream.py:262, in ChangeStream.next(self) |
230 """Advance the cursor. |
231 |
232 This method blocks until the next change document is returned or an |
(...)
|
259 Raises :exc:`StopIteration` if this ChangeStream is closed. |
260 """ |
261 while self.alive: |
--> 262 doc = self.try_next() |
263 if doc is not None: |
264 return doc |
|
|
File ~/pymongo/change_stream.py:316, in ChangeStream.try_next(self) |
313 # Attempt to get the next change with at most one getMore and at most |
314 # one resume attempt. |
315 try: |
--> 316 change = self._cursor._try_next(True) |
317 except (ConnectionFailure, CursorNotFound): |
318 self._resume() |
|
|
File ~/pymongo/command_cursor.py:292, in CommandCursor._try_next(self, get_more_allowed) |
290 """Advance the cursor blocking for at most one getMore command.""" |
291 if not len(self.__data) and not self.__killed and get_more_allowed: |
--> 292 self._refresh() |
293 if len(self.__data): |
294 return self.__data.popleft() |
|
|
File ~/pymongo/command_cursor.py:218, in CommandCursor._refresh(self) |
216 dbname, collname = self.__ns.split('.', 1) |
217 read_pref = self.__collection._read_preference_for(self.session) |
--> 218 self.__send_message( |
219 self._getmore_class(dbname, |
220 collname, |
221 self.__batch_size, |
222 self.__id, |
223 self.__collection.codec_options, |
224 read_pref, |
225 self.__session, |
226 self.__collection.database.client, |
227 self.__max_await_time_ms, |
228 self.__sock_mgr, False)) |
229 else: # Cursor id is zero nothing else to return |
230 self.__die(True) |
|
|
File ~/pymongo/command_cursor.py:164, in CommandCursor.__send_message(self, operation) |
162 client = self.__collection.database.client |
163 try: |
--> 164 response = client._run_operation( |
165 operation, self._unpack_response, address=self.__address) |
166 except OperationFailure as exc: |
167 if exc.code in _CURSOR_CLOSED_ERRORS: |
168 # Don't send killCursors because the cursor is already closed. |
|
|
File ~/pymongo/mongo_client.py:1201, in MongoClient._run_operation(self, operation, unpack_res, address) |
1196 def _cmd(session, server, sock_info, read_preference): |
1197 return server.run_operation( |
1198 sock_info, operation, read_preference, self._event_listeners, |
1199 unpack_res) |
-> 1201 return self._retryable_read( |
1202 _cmd, operation.read_preference, operation.session, |
1203 address=address, retryable=isinstance(operation, message._Query)) |
|
|
File ~/pymongo/mongo_client.py:1302, in MongoClient._retryable_read(self, func, read_pref, session, address, retryable) |
1300 assert last_error is not None |
1301 raise last_error |
-> 1302 return func(session, server, sock_info, read_pref) |
1303 except ServerSelectionTimeoutError: |
1304 if retrying: |
1305 # The application may think the write was never attempted |
1306 # if we raise ServerSelectionTimeoutError on the retry |
1307 # attempt. Raise the original exception instead. |
|
|
File ~/pymongo/mongo_client.py:1197, in MongoClient._run_operation.<locals>._cmd(session, server, sock_info, read_preference) |
1196 def _cmd(session, server, sock_info, read_preference): |
-> 1197 return server.run_operation( |
1198 sock_info, operation, read_preference, self._event_listeners, |
1199 unpack_res) |
|
|
File ~/pymongo/server.py:130, in Server.run_operation(self, sock_info, operation, read_preference, listeners, unpack_res) |
128 first = docs[0] |
129 operation.client._process_response(first, operation.session) |
--> 130 _check_command_response(first, sock_info.max_wire_version) |
131 except Exception as exc: |
132 if publish: |
|
|
File ~/pymongo/helpers.py:166, in _check_command_response(response, max_wire_version, allowable_errors, parse_write_concern_error) |
163 elif code == 43: |
164 raise CursorNotFound(errmsg, code, response, max_wire_version) |
--> 166 raise OperationFailure(errmsg, code, response, max_wire_version) |
|
|
OperationFailure: assertion src/mongo/db/exec/document_value/value.h:478, full error: {'ok': 0.0, 'errmsg': 'assertion src/mongo/db/exec/document_value/value.h:478', 'code': 8, 'codeName': 'UnknownError', '$clusterTime': {'clusterTime': Timestamp(1649786952, 1), 'signature': {'hash': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 'keyId': 0}}, 'operationTime': Timestamp(1649786952, 1)} |
Attachments
Issue Links
- has to be done before
-
JAVA-4579 Re-enable change stream unified test for 6.0+ sharded deployments
-
- Closed
-
-
DRIVERS-1995 Do not error when parsing change stream event documents
-
- Implementing
-
- is depended on by
-
PYTHON-3189 Change Stream event document missing "to" field for rename events
-
- Closed
-