Concurrently closing a ChangeStream cursor is not thread safe

XMLWordPrintableJSON

    • 🔵 Done
    • Hide

      1. What would you like to communicate to the user about this feature?
      2. Would you like the user to see examples of the syntax and/or executable code and its output?
      3. Which versions of the driver/connector does this apply to?

      Show
      1. What would you like to communicate to the user about this feature? 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to?
    • None
    • None
    • None
    • None
    • None
    • None

      Concurrently closing a ChangeStream cursor is not thread safe. The following test failure shows that when the change stream is being iterated in Thread-5 and the stream is closed by the main thread, Thread-5 receives a CursorKilled error and calls close(). Meanwhile the main thread also calls close() at the same time. The main thread fails with an unexpected AttributeError exception:

       [2023/05/12 00:04:20.770]   test_concurrent_close (test_change_stream.TestClusterChangeStream)
       [2023/05/12 00:04:21.816] Ensure a ChangeStream can be closed from another thread. ... Exception in thread Thread-5:
       [2023/05/12 00:04:21.816] Traceback (most recent call last):
       [2023/05/12 00:04:21.816]   File "/opt/python/pypy3.7/lib-python/3/threading.py", line 926, in _bootstrap_inner
       [2023/05/12 00:04:21.816]     self.run()
       [2023/05/12 00:04:21.816]   File "/opt/python/pypy3.7/lib-python/3/threading.py", line 870, in run
       [2023/05/12 00:04:21.816]     self._target(*self._args, **self._kwargs)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/test/test_change_stream.py", line 331, in iterate_cursor
       [2023/05/12 00:04:21.816]     for _ in change_stream:
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper
       [2023/05/12 00:04:21.816]     return func(self, *args, **kwargs)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/change_stream.py", line 309, in next
       [2023/05/12 00:04:21.816]     doc = self.try_next()
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper
       [2023/05/12 00:04:21.816]     return func(self, *args, **kwargs)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/change_stream.py", line 368, in try_next
       [2023/05/12 00:04:21.816]     change = self._cursor._try_next(True)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 297, in _try_next
       [2023/05/12 00:04:21.816]     self._refresh()
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 231, in _refresh
       [2023/05/12 00:04:21.816]     self.__comment,
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 165, in __send_message
       [2023/05/12 00:04:21.816]     operation, self._unpack_response, address=self.__address
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper
       [2023/05/12 00:04:21.816]     return func(self, *args, **kwargs)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1345, in _run_operation
       [2023/05/12 00:04:21.816]     retryable=isinstance(operation, message._Query),
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/_csot.py", line 106, in csot_wrapper
       [2023/05/12 00:04:21.816]     return func(self, *args, **kwargs)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1464, in _retryable_read
       [2023/05/12 00:04:21.816]     return func(session, server, sock_info, read_pref)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1337, in _cmd
       [2023/05/12 00:04:21.816]     sock_info, operation, read_preference, self._event_listeners, unpack_res
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/helpers.py", line 279, in inner
       [2023/05/12 00:04:21.816]     return func(*args, **kwargs)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/server.py", line 135, in run_operation
       [2023/05/12 00:04:21.816]     _check_command_response(first, sock_info.max_wire_version)
       [2023/05/12 00:04:21.816]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/helpers.py", line 194, in _check_command_response
       [2023/05/12 00:04:21.816]     raise OperationFailure(errmsg, code, response, max_wire_version)
       [2023/05/12 00:04:21.816] pymongo.errors.OperationFailure: operation was interrupted, full error: {'ok': 0.0, 'errmsg': 'operation was interrupted', 'code': 237, 'codeName': 'CursorKilled', 'operationTime': Timestamp(1683849860, 32), '$clusterTime': {'clusterTime': Timestamp(1683849860, 32), 'signature': {'hash': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 'keyId': 0}}}
       [2023/05/12 00:04:21.816] ERROR (1.046s)
      

      And the stream.close() thread fails with:

       [2023/05/12 00:08:20.475] ERROR [1.046s]: test_concurrent_close (test_change_stream.TestClusterChangeStream)
       [2023/05/12 00:08:20.475] Ensure a ChangeStream can be closed from another thread.
       [2023/05/12 00:08:20.475] ----------------------------------------------------------------------
       [2023/05/12 00:08:20.475] Traceback (most recent call last):
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/test/test_change_stream.py", line 338, in test_concurrent_close
       [2023/05/12 00:08:20.475]     change_stream.close()
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/change_stream.py", line 261, in close
       [2023/05/12 00:08:20.475]     self._cursor.close()
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 107, in close
       [2023/05/12 00:08:20.475]     self.__die(True)
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/command_cursor.py", line 94, in __die
       [2023/05/12 00:08:20.475]     self.__explicit_session,
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1601, in _cleanup_cursor
       [2023/05/12 00:08:20.475]     session._end_session(lock=locks_allowed)
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/client_session.py", line 513, in _end_session
       [2023/05/12 00:08:20.475]     self._client._return_server_session(self._server_session, lock)
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/mongo_client.py", line 1747, in _return_server_session
       [2023/05/12 00:08:20.475]     return self._topology.return_server_session(server_session, lock)
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/topology.py", line 573, in return_server_session
       [2023/05/12 00:08:20.475]     server_session, self._description.logical_session_timeout_minutes
       [2023/05/12 00:08:20.475]   File "/data/mci/b0b5ba44c3a443a532edbe7839266c6f/src/pymongo/client_session.py", line 1098, in return_server_session
       [2023/05/12 00:08:20.475]     if server_session.timed_out(session_timeout_minutes):
       [2023/05/12 00:08:20.475] AttributeError: 'NoneType' object has no attribute 'timed_out'
      

      https://evergreen.mongodb.com/task/mongo_python_driver_tests_python_version_rhel8.4_test_ssl__platform~rhel84_auth_ssl~noauth_nossl_python_version~pypy3.7_coverage~coverage_test_4.2_sharded_cluster_bc1a513d1041fceabc8cb1e8372bcb0807343813_23_05_11_22_29_43

      We need to add a lock or a guard to prevent two threads from calling close() at the same time.

              Assignee:
              Unassigned
              Reporter:
              Shane Harvey
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: