- 
    Type:Bug 
- 
    Resolution: Gone away
- 
    Priority:Unknown 
- 
    None
- 
    Affects Version/s: None
- 
    Component/s: None
Concurrently closing a ChangeStream cursor is not thread safe. The following test failure shows that when the change stream is being iterated in Thread-5 and the stream is closed by the main thread, Thread-5 receives a CursorKilled error and calls close(). Meanwhile the main thread also calls close() at the same time. The main thread fails with an unexpected AttributeError exception:
[2023/05/12 00:04:20.770] test_concurrent_close (test_change_stream.TestClusterChangeStream) [2023/05/12 00:04:21.816] Ensure a ChangeStream can be closed from another thread. ... Exception in thread Thread-5: [2023/05/12 00:04:21.816] Traceback (most recent call last): [2023/05/12 00:04:21.816] File "/opt/python/pypy3.7/lib-python/3/threading.py", line 926, in _bootstrap_inner [2023/05/12 00:04:21.816] self.run() [2023/05/12 00:04:21.816] File "/opt/python/pypy3.7/lib-python/3/threading.py", line 870, in run [2023/05/12 00:04:21.816] self._target(*self._args, **self._kwargs) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/test/test_change_stream.py", line 331, in iterate_cursor [2023/05/12 00:04:21.816] for _ in change_stream: [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper [2023/05/12 00:04:21.816] return func(self, *args, **kwargs) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/change_stream.py", line 309, in next [2023/05/12 00:04:21.816] doc = self.try_next() [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper [2023/05/12 00:04:21.816] return func(self, *args, **kwargs) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/change_stream.py", line 368, in try_next [2023/05/12 00:04:21.816] change = self._cursor._try_next(True) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 297, in _try_next [2023/05/12 00:04:21.816] self._refresh() [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 231, in _refresh [2023/05/12 00:04:21.816] self.__comment, [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 165, in __send_message [2023/05/12 00:04:21.816] operation, self._unpack_response, address=self.__address [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper [2023/05/12 00:04:21.816] return func(self, *args, **kwargs) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1345, in _run_operation [2023/05/12 00:04:21.816] retryable=isinstance(operation, message._Query), [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper [2023/05/12 00:04:21.816] return func(self, *args, **kwargs) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1464, in _retryable_read [2023/05/12 00:04:21.816] return func(session, server, sock_info, read_pref) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1337, in _cmd [2023/05/12 00:04:21.816] sock_info, operation, read_preference, self._event_listeners, unpack_res [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/helpers.py", line 279, in inner [2023/05/12 00:04:21.816] return func(*args, **kwargs) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/server.py", line 135, in run_operation [2023/05/12 00:04:21.816] _check_command_response(first, sock_info.max_wire_version) [2023/05/12 00:04:21.816] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/helpers.py", line 194, in _check_command_response [2023/05/12 00:04:21.816] raise OperationFailure(errmsg, code, response, max_wire_version) [2023/05/12 00:04:21.816] pymongo.errors.OperationFailure: operation was interrupted, full error: {'ok': 0.0, 'errmsg': 'operation was interrupted', 'code': 237, 'codeName': 'CursorKilled', 'operationTime': Timestamp(1683849860, 32), '$clusterTime': {'clusterTime': Timestamp(1683849860, 32), 'signature': {'hash': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 'keyId': 0}}} [2023/05/12 00:04:21.816] ERROR (1.046s)
And the stream.close() thread fails with:
[2023/05/12 00:08:20.475] ERROR [1.046s]: test_concurrent_close (test_change_stream.TestClusterChangeStream) [2023/05/12 00:08:20.475] Ensure a ChangeStream can be closed from another thread. [2023/05/12 00:08:20.475] ---------------------------------------------------------------------- [2023/05/12 00:08:20.475] Traceback (most recent call last): [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/test/test_change_stream.py", line 338, in test_concurrent_close [2023/05/12 00:08:20.475] change_stream.close() [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/change_stream.py", line 261, in close [2023/05/12 00:08:20.475] self._cursor.close() [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 107, in close [2023/05/12 00:08:20.475] self.__die(True) [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 94, in __die [2023/05/12 00:08:20.475] self.__explicit_session, [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1601, in _cleanup_cursor [2023/05/12 00:08:20.475] session._end_session(lock=locks_allowed) [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/client_session.py", line 513, in _end_session [2023/05/12 00:08:20.475] self._client._return_server_session(self._server_session, lock) [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1747, in _return_server_session [2023/05/12 00:08:20.475] return self._topology.return_server_session(server_session, lock) [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/topology.py", line 573, in return_server_session [2023/05/12 00:08:20.475] server_session, self._description.logical_session_timeout_minutes [2023/05/12 00:08:20.475] File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/client_session.py", line 1098, in return_server_session [2023/05/12 00:08:20.475] if server_session.timed_out(session_timeout_minutes): [2023/05/12 00:08:20.475] AttributeError: 'NoneType' object has no attribute 'timed_out'
We need to add a lock or a guard to prevent two threads from calling close() at the same time.
- causes
- 
                    PYTHON-2232 Test failure - test_change_stream.TestCollectionChangeStream.test_concurrent_close -         
- Closed
 
-