Uploaded image for project: 'Python Driver'
  1. Python Driver
  2. PYTHON-4660

bulk_write fails with AttributeError when a batch fails with InvalidBSON

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Unknown Unknown
    • 4.9
    • Affects Version/s: None
    • Component/s: None
    • None
    • Python Drivers
    • Not Needed
    • Hide

      1. What would you like to communicate to the user about this feature?
      2. Would you like the user to see examples of the syntax and/or executable code and its output?
      3. Which versions of the driver/connector does this apply to?

      Show
      1. What would you like to communicate to the user about this feature? 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to?

      bulk_write fails with AttributeError when a batch fails with InvalidBSON:

       [2024/08/12 10:41:58.788] FAILURE: AttributeError: 'InvalidBSON' object has no attribute 'details' ()
       [2024/08/12 10:41:58.788] self = <test.asynchronous.test_client_bulk_write.TestClientBulkWriteCRUD testMethod=test_collects_write_errors_across_batches_ordered>
       [2024/08/12 10:41:58.788]     @async_client_context.require_version_min(8, 0, 0, -24)
       [2024/08/12 10:41:58.788]     async def test_collects_write_errors_across_batches_ordered(self):
       [2024/08/12 10:41:58.788]         listener = OvertCommandListener()
       [2024/08/12 10:41:58.788]         client = await async_rs_or_single_client(event_listeners=[listener])
       [2024/08/12 10:41:58.788]         self.addAsyncCleanup(client.aclose)
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]         collection = client.db["coll"]
       [2024/08/12 10:41:58.788]         self.addAsyncCleanup(collection.drop)
       [2024/08/12 10:41:58.788]         await collection.drop()
       [2024/08/12 10:41:58.788]         await collection.insert_one(document={"_id": 1})
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]         models = []
       [2024/08/12 10:41:58.788]         for _ in range(self.max_write_batch_size + 1):
       [2024/08/12 10:41:58.788]             models.append(
       [2024/08/12 10:41:58.788]                 InsertOne(
       [2024/08/12 10:41:58.788]                     namespace="db.coll",
       [2024/08/12 10:41:58.788]                     document={"_id": 1},
       [2024/08/12 10:41:58.788]                 )
       [2024/08/12 10:41:58.788]             )
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]         with self.assertRaises(ClientBulkWriteException) as context:
       [2024/08/12 10:41:58.788] >           await client.bulk_write(models=models, ordered=True)
       [2024/08/12 10:41:58.788] test/asynchronous/test_client_bulk_write.py:215: 
       [2024/08/12 10:41:58.788] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
       [2024/08/12 10:41:58.788] pymongo/_csot.py:110: in csot_wrapper
       [2024/08/12 10:41:58.788]     return await func(self, *args, **kwargs)
       [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:2349: in bulk_write
       [2024/08/12 10:41:58.788]     return await blk.execute(session, _Op.BULK_WRITE)
       [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:783: in execute
       [2024/08/12 10:41:58.788]     result = await self.execute_command(session, operation)
       [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:651: in execute_command
       [2024/08/12 10:41:58.788]     await self.client._retryable_write(
       [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:1873: in _retryable_write
       [2024/08/12 10:41:58.788]     return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
       [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:1759: in _retry_with_session
       [2024/08/12 10:41:58.788]     return await self._retry_internal(
       [2024/08/12 10:41:58.788] pymongo/_csot.py:110: in csot_wrapper
       [2024/08/12 10:41:58.788]     return await func(self, *args, **kwargs)
       [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:1794: in _retry_internal
       [2024/08/12 10:41:58.788]     return await _ClientConnectionRetryable(
       [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:2530: in run
       [2024/08/12 10:41:58.788]     return await self._read() if self._is_read else await self._write()
       [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:2650: in _write
       [2024/08/12 10:41:58.788]     return await self._func(self._session, conn, self._retryable)  # type: ignore
       [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:642: in retryable_bulk
       [2024/08/12 10:41:58.788]     await self._execute_command(
       [2024/08/12 10:41:58.788] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
       [2024/08/12 10:41:58.788]     async def _execute_command(
       [2024/08/12 10:41:58.788]         self,
       [2024/08/12 10:41:58.788]         write_concern: WriteConcern,
       [2024/08/12 10:41:58.788]         session: Optional[AsyncClientSession],
       [2024/08/12 10:41:58.788]         conn: AsyncConnection,
       [2024/08/12 10:41:58.788]         op_id: int,
       [2024/08/12 10:41:58.788]         retryable: bool,
       [2024/08/12 10:41:58.788]         full_result: MutableMapping[str, Any],
       [2024/08/12 10:41:58.788]         final_write_concern: Optional[WriteConcern] = None,
       [2024/08/12 10:41:58.788]     ) -> None:
       [2024/08/12 10:41:58.788]         """Internal helper for executing batches of bulkWrite commands."""
       [2024/08/12 10:41:58.788]         db_name = "admin"
       [2024/08/12 10:41:58.788]         cmd_name = "bulkWrite"
       [2024/08/12 10:41:58.788]         listeners = self.client._event_listeners
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]         # AsyncConnection.command validates the session, but we use
       [2024/08/12 10:41:58.788]         # AsyncConnection.write_command
       [2024/08/12 10:41:58.788]         conn.validate_session(self.client, session)
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]         bwc = self.bulk_ctx_class(
       [2024/08/12 10:41:58.788]             db_name,
       [2024/08/12 10:41:58.788]             cmd_name,
       [2024/08/12 10:41:58.788]             conn,
       [2024/08/12 10:41:58.788]             op_id,
       [2024/08/12 10:41:58.788]             listeners,  # type: ignore[arg-type]
       [2024/08/12 10:41:58.788]             session,
       [2024/08/12 10:41:58.788]             self.client.codec_options,
       [2024/08/12 10:41:58.788]         )
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]         while self.idx_offset < self.total_ops:
       [2024/08/12 10:41:58.788]             # If this is the last possible batch, use the
       [2024/08/12 10:41:58.788]             # final write concern.
       [2024/08/12 10:41:58.788]             if self.total_ops - self.idx_offset <= bwc.max_write_batch_size:
       [2024/08/12 10:41:58.788]                 write_concern = final_write_concern or write_concern
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]             # Construct the server command, specifying the relevant options.
       [2024/08/12 10:41:58.788]             cmd = {"bulkWrite": 1}
       [2024/08/12 10:41:58.788]             cmd["errorsOnly"] = not self.verbose_results
       [2024/08/12 10:41:58.788]             cmd["ordered"] = self.ordered  # type: ignore[assignment]
       [2024/08/12 10:41:58.788]             not_in_transaction = session and not session.in_transaction
       [2024/08/12 10:41:58.788]             if not_in_transaction or not session:
       [2024/08/12 10:41:58.788]                 _csot.apply_write_concern(cmd, write_concern)
       [2024/08/12 10:41:58.788]             if self.bypass_doc_val is not None:
       [2024/08/12 10:41:58.788]                 cmd["bypassDocumentValidation"] = self.bypass_doc_val
       [2024/08/12 10:41:58.788]             if self.comment:
       [2024/08/12 10:41:58.788]                 cmd["comment"] = self.comment  # type: ignore[assignment]
       [2024/08/12 10:41:58.788]             if self.let:
       [2024/08/12 10:41:58.788]                 cmd["let"] = self.let
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]             if session:
       [2024/08/12 10:41:58.788]                 # Start a new retryable write unless one was already
       [2024/08/12 10:41:58.788]                 # started for this command.
       [2024/08/12 10:41:58.788]                 if retryable and not self.started_retryable_write:
       [2024/08/12 10:41:58.788]                     session._start_retryable_write()
       [2024/08/12 10:41:58.788]                     self.started_retryable_write = True
       [2024/08/12 10:41:58.788]                 session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn)
       [2024/08/12 10:41:58.788]             conn.send_cluster_time(cmd, session, self.client)
       [2024/08/12 10:41:58.788]             conn.add_server_api(cmd)
       [2024/08/12 10:41:58.788]             # CSOT: apply timeout before encoding the command.
       [2024/08/12 10:41:58.788]             conn.apply_timeout(self.client, cmd)
       [2024/08/12 10:41:58.788]             ops = islice(self.ops, self.idx_offset, None)
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]             # Run as many ops as possible in one server command.
       [2024/08/12 10:41:58.788]             if write_concern.acknowledged:
       [2024/08/12 10:41:58.788]                 raw_result, to_send_ops, _ = await self._execute_batch(bwc, cmd, ops)  # type: ignore[arg-type]
       [2024/08/12 10:41:58.788]                 result = copy.deepcopy(raw_result)
       [2024/08/12 10:41:58.788]     
       [2024/08/12 10:41:58.788]                 # Top-level server/network error.
       [2024/08/12 10:41:58.788]                 if result.get("error"):
       [2024/08/12 10:41:58.788]                     error = result["error"]
       [2024/08/12 10:41:58.788]                     retryable_top_level_error = (
       [2024/08/12 10:41:58.788] >                       isinstance(error.details, dict)
       [2024/08/12 10:41:58.788]                         and error.details.get("code", 0) in _RETRYABLE_ERROR_CODES
       [2024/08/12 10:41:58.788]                     )
       [2024/08/12 10:41:58.788] E                   AttributeError: 'InvalidBSON' object has no attribute 'details'
       [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:553: AttributeError
      

      https://spruce.mongodb.com/task/mongo_python_driver_tests_pyopenssl__platform~rhel8_auth~auth_ssl~ssl_python_version~3.12_pyopenssl~enabled_test_8.0_replica_set_a232b657d01030d2bc2b40db068ebb49f8b964a4_24_08_12_17_23_43?execution=0&sortBy=STATUS&sortDir=ASC

      We should guard the access on "error.details".

            Assignee:
            shruti.sridhar@mongodb.com Shruti Sridhar (Inactive)
            Reporter:
            shane.harvey@mongodb.com Shane Harvey
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: