Uploaded image for project: 'Python Driver'
  1. Python Driver
  2. PYTHON-4659

Async does not work with PyOpenSSL

    • Type: Icon: Task Task
    • Resolution: Fixed
    • Priority: Icon: Unknown Unknown
    • 4.9
    • Affects Version/s: None
    • Component/s: None
    • None
    • Python Drivers
    • Not Needed
    • Hide

      1. What would you like to communicate to the user about this feature?
      2. Would you like the user to see examples of the syntax and/or executable code and its output?
      3. Which versions of the driver/connector does this apply to?

      Show
      1. What would you like to communicate to the user about this feature? 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to?

      The fix for PYTHON-4618 has a bug which manifests as a a few different errors in tests that send large bulk writes that need to be spilt. For example one error is:

       [2024/08/12 10:43:23.907] FAILURE: pymongo.errors.OperationFailure: assertion src/mongo/util/hex.cpp:100, full error: {'operationTime': Timestamp(1723484597, 9), 'ok': 0.0, 'errmsg': 'assertion src/mongo/util/hex.cpp:100', 'code': 8, 'codeName': 'UnknownError', '$clusterTime': {'clusterTime': Timestamp(1723484597, 9), 'signature': {'hash': b'\xadc\x8d\x00\xb1gUt\x140\xd8t\x16\xfa\x14AO\xc7E\xa9', 'keyId': 7402309674332061700}}} ()
       [2024/08/12 10:43:23.907] self = <test.asynchronous.test_collection.AsyncTestCollection testMethod=test_large_limit>
       [2024/08/12 10:43:23.907]     async def test_large_limit(self):
       [2024/08/12 10:43:23.907]         db = self.db
       [2024/08/12 10:43:23.907]         await db.drop_collection("test_large_limit")
       [2024/08/12 10:43:23.907]         await db.test_large_limit.create_index([("x", 1)])
       [2024/08/12 10:43:23.907]         my_str = "mongomongo" * 1000
       [2024/08/12 10:43:23.907]     
       [2024/08/12 10:43:23.907] >       await db.test_large_limit.insert_many({"x": i, "y": my_str} for i in range(2000))
       [2024/08/12 10:43:23.907] test/asynchronous/test_collection.py:1651: 
       [2024/08/12 10:43:23.907] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
       [2024/08/12 10:43:23.907] pymongo/_csot.py:110: in csot_wrapper
       [2024/08/12 10:43:23.907]     return await func(self, *args, **kwargs)
       [2024/08/12 10:43:23.907] pymongo/asynchronous/collection.py:949: in insert_many
       [2024/08/12 10:43:23.907]     await blk.execute(write_concern, session, _Op.INSERT)
       [2024/08/12 10:43:23.907] pymongo/asynchronous/bulk.py:739: in execute
       [2024/08/12 10:43:23.907]     return await self.execute_command(generator, write_concern, session, operation)
       [2024/08/12 10:43:23.907] pymongo/asynchronous/bulk.py:594: in execute_command
       [2024/08/12 10:43:23.907]     _ = await client._retryable_write(
       [2024/08/12 10:43:23.907] pymongo/asynchronous/mongo_client.py:1873: in _retryable_write
       [2024/08/12 10:43:23.907]     return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
       [2024/08/12 10:43:23.907] pymongo/asynchronous/mongo_client.py:1759: in _retry_with_session
       [2024/08/12 10:43:23.907]     return await self._retry_internal(
       [2024/08/12 10:43:23.907] pymongo/_csot.py:110: in csot_wrapper
       [2024/08/12 10:43:23.907]     return await func(self, *args, **kwargs)
       [2024/08/12 10:43:23.907] pymongo/asynchronous/mongo_client.py:1794: in _retry_internal
       [2024/08/12 10:43:23.907]     return await _ClientConnectionRetryable(
       [2024/08/12 10:43:23.907] pymongo/asynchronous/mongo_client.py:2530: in run
       [2024/08/12 10:43:23.907]     return await self._read() if self._is_read else await self._write()
       [2024/08/12 10:43:23.907] pymongo/asynchronous/mongo_client.py:2650: in _write
       [2024/08/12 10:43:23.907]     return await self._func(self._session, conn, self._retryable)  # type: ignore
       [2024/08/12 10:43:23.907] pymongo/asynchronous/bulk.py:583: in retryable_bulk
       [2024/08/12 10:43:23.907]     await self._execute_command(
       [2024/08/12 10:43:23.907] pymongo/asynchronous/bulk.py:528: in _execute_command
       [2024/08/12 10:43:23.907]     result, to_send = await self._execute_batch(bwc, cmd, ops, client)
       [2024/08/12 10:43:23.907] pymongo/asynchronous/bulk.py:451: in _execute_batch
       [2024/08/12 10:43:23.907]     result = await self.write_command(bwc, cmd, request_id, msg, to_send, client)  # type: ignore[arg-type]
       [2024/08/12 10:43:23.907] pymongo/asynchronous/helpers.py:45: in inner
       [2024/08/12 10:43:23.907]     return await func(*args, **kwargs)
       [2024/08/12 10:43:23.907] pymongo/asynchronous/bulk.py:263: in write_command
       [2024/08/12 10:43:23.907]     reply = await bwc.conn.write_command(request_id, msg, bwc.codec)  # type: ignore[misc]
       [2024/08/12 10:43:23.907] pymongo/asynchronous/pool.py:628: in write_command
       [2024/08/12 10:43:23.907]     helpers_shared._check_command_response(result, self.max_wire_version)
      

      https://spruce.mongodb.com/task/mongo_python_driver_tests_pyopenssl__platform~rhel8_auth~auth_ssl~ssl_python_version~3.10_pyopenssl~enabled_test_4.2_replica_set_a232b657d01030d2bc2b40db068ebb49f8b964a4_24_08_12_17_23_43?execution=0&sortBy=STATUS&sortDir=ASC

      Other errors include:

       [2024/08/12 10:41:58.416] FAILURE: pymongo.errors.ClientBulkWriteException: batch op errors occurred, full error: {'anySuccessful': False, 'error': OperationFailure('Not null terminated string in element with field name \'document.a\' in object with unknown _id, full error: {\'ok\': 0.0, \'errmsg\': "Not null terminated string in element with field name \'document.a\' in object with unknown _id", \'code\': 22, \'codeName\': \'InvalidBSON\', \'$clusterTime\': {\'clusterTime\': Timestamp(1723484506, 1), \'signature\': {\'hash\': b\'\\xf0\\x89\\xd9\\xf5K\\x05\\x02\\xebph\\xc5D(\\xc0+\\xd3\\xb8\\x8b\\x85\\x11\', \'keyId\': 7402309287785005062}}, \'operationTime\': Timestamp(1723484506, 1)}'), 'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 0, 'nUpserted': 0, 'nMatched': 0, 'nModified': 0, 'nDeleted': 0, 'insertResults': {}, 'updateResults': {}, 'deleteResults': {}} ()
       [2024/08/12 10:41:58.416] self = <test.asynchronous.test_client_bulk_write.TestClientBulkWriteCRUD testMethod=test_batch_splits_if_new_namespace_is_too_large>
       [2024/08/12 10:41:58.416]     @async_client_context.require_version_min(8, 0, 0, -24)
       [2024/08/12 10:41:58.416]     async def test_batch_splits_if_new_namespace_is_too_large(self):
       [2024/08/12 10:41:58.416]         listener = OvertCommandListener()
       [2024/08/12 10:41:58.416]         client = await async_rs_or_single_client(event_listeners=[listener])
       [2024/08/12 10:41:58.416]         self.addAsyncCleanup(client.aclose)
       [2024/08/12 10:41:58.416]     
       [2024/08/12 10:41:58.416]         num_models, models = await self._setup_namespace_test_models()
       [2024/08/12 10:41:58.416]         c_repeated = "c" * 200
       [2024/08/12 10:41:58.416]         namespace = f"db.{c_repeated}"
       [2024/08/12 10:41:58.416]         models.append(
       [2024/08/12 10:41:58.416]             InsertOne(
       [2024/08/12 10:41:58.416]                 namespace=namespace,
       [2024/08/12 10:41:58.416]                 document={"a": "b"},
       [2024/08/12 10:41:58.416]             )
       [2024/08/12 10:41:58.416]         )
       [2024/08/12 10:41:58.416]         self.addAsyncCleanup(client.db["coll"].drop)
       [2024/08/12 10:41:58.416]         self.addAsyncCleanup(client.db[c_repeated].drop)
       [2024/08/12 10:41:58.416]     
       [2024/08/12 10:41:58.416]         # Batch splitting required.
       [2024/08/12 10:41:58.416] >       result = await client.bulk_write(models=models)
      

      And:

       [2024/08/12 10:52:06.244] FAILURE: pymongo.errors.OperationFailure: BSON object not terminated with EOO in element with field name '?' in object with unknown _id, full error: {'ok': 0.0, 'errmsg': "BSON object not terminated with EOO in element with field name '?' in object with unknown _id", 'code': 22, 'codeName': 'InvalidBSON'} ()
       [2024/08/12 10:52:06.244] self = <test.asynchronous.test_cursor.TestCursor testMethod=test_close_kills_cursor_synchronously>
       [2024/08/12 10:52:06.244]     async def test_close_kills_cursor_synchronously(self):
       [2024/08/12 10:52:06.244]         # Kill any cursors possibly queued up by previous tests.
       [2024/08/12 10:52:06.244]         gc.collect()
       [2024/08/12 10:52:06.244]         await self.client._process_periodic_tasks()
       [2024/08/12 10:52:06.244]     
       [2024/08/12 10:52:06.244]         listener = AllowListEventListener("killCursors")
       [2024/08/12 10:52:06.244]         client = await async_rs_or_single_client(event_listeners=[listener])
       [2024/08/12 10:52:06.244]         self.addAsyncCleanup(client.aclose)
       [2024/08/12 10:52:06.244]         coll = client[self.db.name].test_close_kills_cursors
       [2024/08/12 10:52:06.244]     
       [2024/08/12 10:52:06.244]         # Add some test data.
       [2024/08/12 10:52:06.244]         docs_inserted = 1000
       [2024/08/12 10:52:06.244] >       await coll.insert_many([{"i": i} for i in range(docs_inserted)])
       [2024/08/12 10:52:06.244] test/asynchronous/test_cursor.py:1270: 
       [2024/08/12 10:52:06.244] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
       [2024/08/12 10:52:06.244] pymongo/_csot.py:110: in csot_wrapper
       [2024/08/12 10:52:06.244]     return await func(self, *args, **kwargs)
       [2024/08/12 10:52:06.244] pymongo/asynchronous/collection.py:949: in insert_many
       [2024/08/12 10:52:06.244]     await blk.execute(write_concern, session, _Op.INSERT)
       [2024/08/12 10:52:06.244] pymongo/asynchronous/bulk.py:739: in execute
       [2024/08/12 10:52:06.244]     return await self.execute_command(generator, write_concern, session, operation)
       [2024/08/12 10:52:06.244] pymongo/asynchronous/bulk.py:594: in execute_command
       [2024/08/12 10:52:06.244]     _ = await client._retryable_write(
       [2024/08/12 10:52:06.244] pymongo/asynchronous/mongo_client.py:1873: in _retryable_write
       [2024/08/12 10:52:06.244]     return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
       [2024/08/12 10:52:06.244] pymongo/asynchronous/mongo_client.py:1759: in _retry_with_session
       [2024/08/12 10:52:06.244]     return await self._retry_internal(
       [2024/08/12 10:52:06.244] pymongo/_csot.py:110: in csot_wrapper
       [2024/08/12 10:52:06.244]     return await func(self, *args, **kwargs)
       [2024/08/12 10:52:06.244] pymongo/asynchronous/mongo_client.py:1794: in _retry_internal
       [2024/08/12 10:52:06.244]     return await _ClientConnectionRetryable(
       [2024/08/12 10:52:06.244] pymongo/asynchronous/mongo_client.py:2530: in run
       [2024/08/12 10:52:06.244]     return await self._read() if self._is_read else await self._write()
       [2024/08/12 10:52:06.244] pymongo/asynchronous/mongo_client.py:2650: in _write
       [2024/08/12 10:52:06.244]     return await self._func(self._session, conn, self._retryable)  # type: ignore
       [2024/08/12 10:52:06.244] pymongo/asynchronous/bulk.py:583: in retryable_bulk
       [2024/08/12 10:52:06.244]     await self._execute_command(
       [2024/08/12 10:52:06.244] pymongo/asynchronous/bulk.py:528: in _execute_command
       [2024/08/12 10:52:06.244]     result, to_send = await self._execute_batch(bwc, cmd, ops, client)
       [2024/08/12 10:52:06.244] pymongo/asynchronous/bulk.py:451: in _execute_batch
       [2024/08/12 10:52:06.244]     result = await self.write_command(bwc, cmd, request_id, msg, to_send, client)  # type: ignore[arg-type]
       [2024/08/12 10:52:06.244] pymongo/asynchronous/helpers.py:45: in inner
       [2024/08/12 10:52:06.244]     return await func(*args, **kwargs)
       [2024/08/12 10:52:06.244] pymongo/asynchronous/bulk.py:263: in write_command
       [2024/08/12 10:52:06.244]     reply = await bwc.conn.write_command(request_id, msg, bwc.codec)  # type: ignore[misc]
       [2024/08/12 10:52:06.244] pymongo/asynchronous/pool.py:628: in write_command
       [2024/08/12 10:52:06.244]     helpers_shared._check_command_response(result, self.max_wire_version)
      

      https://spruce.mongodb.com/task/mongo_python_driver_tests_pyopenssl__platform~rhel8_auth~auth_ssl~ssl_python_version~3.12_pyopenssl~enabled_test_7.0_standalone_a232b657d01030d2bc2b40db068ebb49f8b964a4_24_08_12_17_23_43?execution=0&sortBy=STATUS&sortDir=ASC

      My theory is that when using PyOpenSSL large socket writes are not completely sent via our implementation of sendall.

            Assignee:
            shane.harvey@mongodb.com Shane Harvey
            Reporter:
            shane.harvey@mongodb.com Shane Harvey
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: