AutoReconnect Error when Fetching Documents after Long Running Task

XMLWordPrintableJSON

    • Type: Bug
    • Resolution: Unresolved
    • Priority: Major - P3
    • 4.15.5
    • Affects Version/s: 4.12.1
    • Component/s: None
    • None
    • Python Drivers
    • Hide

      1. What would you like to communicate to the user about this feature?
      2. Would you like the user to see examples of the syntax and/or executable code and its output?
      3. Which versions of the driver/connector does this apply to?

      Show
      1. What would you like to communicate to the user about this feature? 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to?
    • None
    • None
    • None
    • None
    • None
    • None

      Detailed steps to reproduce the problem?

       

      I get an error with below steps:
      1. Create a session from mongodb client
      2. Have a long processing task (to reproduce this, I use time.sleep(41))
      3. Then many documents in a single collection (264 documents in my case)

      Pymongo then throw below error

       

      --> 129 existing_docs = await collection.find(
          130     {
          131         "db_connection_id": db_connection_id,
          132         "is_deleted": False,
          133     },
          134     session=self.db_context.session,
          135 ).to_list()
          137 existing_relationships = [
          138     TableRelationship.model_validate(doc, by_alias=True)
          139     for doc in existing_docs
          140 ]
          142 return
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\_csot.py:109, in apply.<locals>.csot_wrapper(self, *args, **kwargs)
          107         with _TimeoutContext(timeout):
          108             return await func(self, *args, **kwargs)
      --> 109 return await func(self, *args, **kwargs)
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\cursor.py:1318, in AsyncCursor.to_list(self, length)
         1316     raise ValueError("to_list() length must be greater than 0")
         1317 while self.alive:
      -> 1318     if not await self._next_batch(res, remaining):
         1319         break
         1320     if length is not None:
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\cursor.py:1274, in AsyncCursor._next_batch(self, result, total)
         1272 if self._empty:
         1273     return False
      -> 1274 if len(self._data) or await self._refresh():
         1275     if total is None:
         1276         result.extend(self._data)
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\cursor.py:1233, in AsyncCursor._refresh(self)
         1218     # Exhaust cursors don't send getMore messages.
         1219     g = self._getmore_class(
         1220         self._dbname,
         1221         self._collname,
         (...)   1231         self._comment,
         1232     )
      -> 1233     await self._send_message(g)
         1235 return len(self._data)
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\cursor.py:1104, in AsyncCursor._send_message(self, operation)
         1101     raise InvalidOperation("exhaust cursors do not support auto encryption")
         1103 try:
      -> 1104     response = await client._run_operation(
         1105         operation, self._unpack_response, address=self._address
         1106     )
         1107 except OperationFailure as exc:
         1108     if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust:
         1109         # Don't send killCursors because the cursor is already closed.
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\_csot.py:109, in apply.<locals>.csot_wrapper(self, *args, **kwargs)
          107         with _TimeoutContext(timeout):
          108             return await func(self, *args, **kwargs)
      --> 109 return await func(self, *args, **kwargs)
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\mongo_client.py:1923, in AsyncMongoClient._run_operation(self, operation, unpack_res, address)
         1913     operation.reset()  # Reset op in case of retry.
         1914     return await server.run_operation(
         1915         conn,
         1916         operation,
         (...)   1920         self,
         1921     )
      -> 1923 return await self._retryable_read(
         1924     _cmd,
         1925     operation.read_preference,
         1926     operation.session,  # type: ignore[arg-type]
         1927     address=address,
         1928     retryable=isinstance(operation, _Query),
         1929     operation=operation.name,
         1930 )
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\mongo_client.py:2032, in AsyncMongoClient._retryable_read(self, func, read_pref, session, operation, address, retryable, operation_id)
         2027 # Ensure that the client supports retrying on reads and there is no session in
         2028 # transaction, otherwise, we will not support retry behavior for this call.
         2029 retryable = bool(
         2030     retryable and self.options.retry_reads and not (session and session.in_transaction)
         2031 )
      -> 2032 return await self._retry_internal(
         2033     func,
         2034     session,
         2035     None,
         2036     operation,
         2037     is_read=True,
         2038     address=address,
         2039     read_pref=read_pref,
         2040     retryable=retryable,
         2041     operation_id=operation_id,
         2042 )
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\_csot.py:109, in apply.<locals>.csot_wrapper(self, *args, **kwargs)
          107         with _TimeoutContext(timeout):
          108             return await func(self, *args, **kwargs)
      --> 109 return await func(self, *args, **kwargs)
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\mongo_client.py:1988, in AsyncMongoClient._retry_internal(self, func, session, bulk, operation, is_read, address, read_pref, retryable, operation_id)
         1962 @_csot.apply
         1963 async def _retry_internal(
         1964     self,
         (...)   1973     operation_id: Optional[int] = None,
         1974 ) -> T:
         1975     """Internal retryable helper for all client transactions.
         1976 
         1977     :param func: Callback function we want to retry
         (...)   1986     :return: Output of the calling func()
         1987     """
      -> 1988     return await _ClientConnectionRetryable(
         1989         mongo_client=self,
         1990         func=func,
         1991         bulk=bulk,
         1992         operation=operation,
         1993         is_read=is_read,
         1994         session=session,
         1995         read_pref=read_pref,
         1996         address=address,
         1997         retryable=retryable,
         1998         operation_id=operation_id,
         1999     ).run()
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\mongo_client.py:2744, in _ClientConnectionRetryable.run(self)
         2742 self._check_last_error(check_csot=True)
         2743 try:
      -> 2744     return await self._read() if self._is_read else await self._write()
         2745 except ServerSelectionTimeoutError:
         2746     # The application may think the write was never attempted
         2747     # if we raise ServerSelectionTimeoutError on the retry
         2748     # attempt. Raise the original exception instead.
         2749     self._check_last_error()
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\mongo_client.py:2891, in _ClientConnectionRetryable._read(self)
         2889 self._server = await self._get_server()
         2890 assert self._read_pref is not None, "Read Preference required on read calls"
      -> 2891 async with self._client._conn_from_server(self._read_pref, self._server, self._session) as (
         2892     conn,
         2893     read_pref,
         2894 ):
         2895     if self._retrying and not self._retryable:
         2896         self._check_last_error()
      
      File ~\AppData\Local\Programs\Python\Python313\Lib\contextlib.py:214, in _AsyncGeneratorContextManager.__aenter__(self)
          212 del self.args, self.kwds, self.func
          213 try:
      --> 214     return await anext(self.gen)
          215 except StopAsyncIteration:
          216     raise RuntimeError("generator didn't yield") from None
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\mongo_client.py:1852, in AsyncMongoClient._conn_from_server(self, read_preference, server, session)
         1843 # Get a connection for a server matching the read preference, and yield
         1844 # conn with the effective read preference. The Server Selection
         1845 # Spec says not to send any $readPreference to standalones and to
         (...)   1849 # NOTE: We already opened the Topology when selecting a server so there's no need
         1850 # to call _get_topology() again.
         1851 single = self._topology.description.topology_type == TOPOLOGY_TYPE.Single
      -> 1852 async with self._checkout(server, session) as conn:
         1853     if single:
         1854         if conn.is_repl and not (session and session.in_transaction):
         1855             # Use primary preferred to ensure any repl set member
         1856             # can handle the request.
      
      File ~\AppData\Local\Programs\Python\Python313\Lib\contextlib.py:214, in _AsyncGeneratorContextManager.__aenter__(self)
          212 del self.args, self.kwds, self.func
          213 try:
      --> 214     return await anext(self.gen)
          215 except StopAsyncIteration:
          216     raise RuntimeError("generator didn't yield") from None
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\mongo_client.py:1762, in AsyncMongoClient._checkout(self, server, session)
         1760     yield session._pinned_connection
         1761     return
      -> 1762 async with await server.checkout(handler=err_handler) as conn:
         1763     # Pin this session to the selected server or connection.
         1764     if (
         1765         in_txn
         1766         and session
         (...)   1771         )
         1772     ):
         1773         session._pin(server, conn)
      
      File ~\AppData\Local\Programs\Python\Python313\Lib\contextlib.py:214, in _AsyncGeneratorContextManager.__aenter__(self)
          212 del self.args, self.kwds, self.func
          213 try:
      --> 214     return await anext(self.gen)
          215 except StopAsyncIteration:
          216     raise RuntimeError("generator didn't yield") from None
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\pool.py:1120, in Pool.checkout(self, handler)
         1111 if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
         1112     _debug_log(
         1113         _CONNECTION_LOGGER,
         1114         message=_ConnectionStatusMessage.CHECKOUT_STARTED,
         (...)   1117         serverPort=self.address[1],
         1118     )
      -> 1120 conn = await self._get_conn(checkout_started_time, handler=handler)
         1122 duration = time.monotonic() - checkout_started_time
         1123 if self.enabled_for_cmap:
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\pool.py:1236, in Pool._get_conn(self, checkout_started_time, handler)
         1233     deadline = None
         1235 async with self.size_cond:
      -> 1236     self._raise_if_not_ready(checkout_started_time, emit_event=True)
         1237     while not (self.requests < self.max_pool_size):
         1238         timeout = deadline - time.monotonic() if deadline else None
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\asynchronous\pool.py:1188, in Pool._raise_if_not_ready(self, checkout_started_time, emit_event)
         1176         _debug_log(
         1177             _CONNECTION_LOGGER,
         1178             message=_ConnectionStatusMessage.CHECKOUT_FAILED,
         (...)   1184             durationMS=duration,
         1185         )
         1187 details = _get_timeout_details(self.opts)
      -> 1188 _raise_connection_failure(
         1189     self.address, AutoReconnect("connection pool paused"), timeout_details=details
         1190 )
      
      File c:\Users\johan\source\StartUp\Services\common.agent\venv\Lib\site-packages\pymongo\pool_shared.py:149, in _raise_connection_failure(address, error, msg_prefix, timeout_details)
          147     raise NetworkTimeout(msg) from error
          148 else:
      --> 149     raise AutoReconnect(msg) from error
      
      AutoReconnect: localhost:27017: connection pool paused (configured timeouts: connectTimeoutMS: 20000.0ms)

       

       

      For more context, I add below listener to the client

      class Listener(monitoring.ConnectionPoolListener):    logger = logging.getLogger("MongoDBListener")        def pool_created(self, event):        self.logger.info(f"[POOL CREATED] {event.address}")
          def pool_ready(self, event):        self.logger.info(f"[POOL READY] {event.address}")
          def pool_cleared(self, event):        self.logger.info(f"[POOL CLEARED] {event.address}")
          def pool_closed(self, event):        self.logger.info(f"[POOL CLOSED] {event.address}")
          def connection_created(self, event):        self.logger.info(f"[CONN CREATED] {event.address}, id={event.connection_id}")
          def connection_ready(self, event):        self.logger.info(f"[CONN READY] {event.address}, id={event.connection_id}")
          def connection_closed(self, event):        self.logger.info(f"[CONN CLOSED] {event.address}, id={event.connection_id}, reason={event.reason}")
          def connection_check_out_started(self, event):        self.logger.info(f"[CHECKOUT STARTED] {event.address}")
          def connection_checked_out(self, event):        self.logger.info(f"[CHECKED OUT] {event.address}, id={event.connection_id}")
          def connection_checked_in(self, event):        self.logger.info(f"[CHECKED IN] {event.address}, id={event.connection_id}")
          def connection_check_out_failed(self, event):        self.logger.info(f"[CHECKOUT FAILED] {event.address}, reason={event.reason}") 
      class BaseConversationTest(ABC):    ...
          @abstractmethod    async def _ainitialize(self):        pass
          @classmethod    @asynccontextmanager    async def acreate(        cls,        ...    ):            db_client = AsyncMongoClient(                _app_settings.db.db_url,                uuidRepresentation="standard",                event_listeners=[Listener()],            )
                  async with db_client.start_session() as session:                db_context = AsyncMongoDbContext(                    db=db_client[_app_settings.db.db_name], session=session                )                instance = cls()                instance.settings = _app_settings
                      instance.db_context = db_context
                      instance.chat_service = ChatService(instance.db_context)
                      encryption_service = mock_boto_encryption_service()                instance.db_service = DbService(instance.db_context, encryption_service)
                      llm = get_llm()                file_service = mock_boto_file_service()                hash_service = get_hash_service()                anchor_vectorstore = get_db_connection_anchor_vectorstore()                db_connection_helper = get_db_connection_helper(                    instance.db_context,                    anchor_vectorstore,                    encryption_service,                    file_service,                )                checkpointer = InMemorySaver()                callback_handlers = []
                      if enable_langfuse:                    langfuse_handler = CallbackHandler()                    callback_handlers.append(langfuse_handler)				...
      		if not skip_initialization:                    dump_path = f"tmp/test_class_dump/{uuid.uuid4()}"                    instance._dump_database(dump_path)
                          await instance._ainitialize()                yield instance
              except Exception as e:            logging.error("An error occurred during test setup.")            instance._restore_database(dump_path)
                  raise e
              finally:            ...
                  if dump_path is not None and not skip_class_level_restore:                instance._restore_database(dump_path)

       

      RUN_INIT_TEST_DATA = True
      
      async def ainitialize_test_data():    async with ConversationTest.acreate(        enable_langfuse=True, skip_class_level_restore=True    ):        pass
      
      if RUN_INIT_TEST_DATA:    await ainitialize_test_data() 

       

      Below is the logs I get when running anitialize_test_data function

       

      2025-11-16 20:40:48,052 [INFO] [MongoDBListener] [POOL CREATED] ('127.0.0.1', 27017)
      2025-11-16 20:40:48,072 [INFO] [MongoDBListener] [POOL READY] ('127.0.0.1', 27017)
      2025-11-16 20:40:48,073 [INFO] [MongoDBListener] [POOL CREATED] ('localhost', 27017)
      2025-11-16 20:40:48,090 [INFO] [MongoDBListener] [POOL READY] ('localhost', 27017)
      2025-11-16 20:40:48,090 [INFO] [MongoDBListener] [POOL CLOSED] ('127.0.0.1', 27017)
      2025-11-16 20:40:48,091 [INFO] [MongoDBListener] [CHECKOUT STARTED] ('localhost', 27017)
      2025-11-16 20:40:48,092 [INFO] [MongoDBListener] [CONN CREATED] ('localhost', 27017), id=1
      2025-11-16 20:40:48,112 [INFO] [MongoDBListener] [CONN READY] ('localhost', 27017), id=1
      2025-11-16 20:40:48,112 [INFO] [MongoDBListener] [CHECKED OUT] ('localhost', 27017), id=1
      2025-11-16 20:40:48,116 [INFO] [MongoDBListener] [CHECKED IN] ('localhost', 27017), id=1
      2025-11-16 20:40:48,118 [INFO] [MongoDBListener] [CHECKOUT STARTED] ('localhost', 27017)
      2025-11-16 20:40:48,118 [INFO] [MongoDBListener] [CHECKED OUT] ('localhost', 27017), id=1
      2025-11-16 20:40:48,126 [INFO] [MongoDBListener] [CHECKED IN] ('localhost', 27017), id=1
      2025-11-16 20:40:48,127 [INFO] [MongoDBListener] [CHECKOUT STARTED] ('localhost', 27017)
      2025-11-16 20:40:48,127 [INFO] [MongoDBListener] [CHECKED OUT] ('localhost', 27017), id=1
      2025-11-16 20:40:48,130 [INFO] [MongoDBListener] [CHECKED IN] ('localhost', 27017), id=1
      2025-11-16 20:41:32,913 [INFO] [DataAnalystForBusinessUserAgentService] Updating DB connection table relationships for DB connection id 00000000-0000-0000-0000-000000000001.
      2025-11-16 20:41:32,913 [INFO] [MongoDBListener] [CHECKOUT STARTED] ('localhost', 27017)
      2025-11-16 20:41:32,914 [INFO] [MongoDBListener] [CHECKED OUT] ('localhost', 27017), id=1
      2025-11-16 20:41:32,916 [INFO] [MongoDBListener] [POOL CLEARED] ('localhost', 27017)
      2025-11-16 20:41:32,931 [INFO] [MongoDBListener] [CHECKED IN] ('localhost', 27017), id=1
      2025-11-16 20:41:32,932 [INFO] [MongoDBListener] [CONN CLOSED] ('localhost', 27017), id=1, reason=stale
      2025-11-16 20:41:32,933 [INFO] [MongoDBListener] [CHECKOUT STARTED] ('localhost', 27017)
      2025-11-16 20:41:32,933 [INFO] [MongoDBListener] [CHECKOUT FAILED] ('localhost', 27017), reason=connectionError 
      AutoReconnect: localhost:27017: connection pool paused (configured timeouts: connectTimeoutMS: 20000.0ms)

      Notes:

      • The error isn't thrown when I lower the time.sleep interval (for example, 20 seconds)
      • The error isn't thrown when I set large batch_size for the find query (for example, 1000)

      I'm thinking that this only happen when the documents count is larger than batch_size, and pymongo have to do auto-reconnect logic mid query.

      I'm also curious on how to prevent connection closed issue when the session is idle, since I'm currently not sure if this is caused by docker, mongo client, or mongo server.

       

      Definition of done: what must be done to consider the task complete?

      Error doesn't appear anymore when the code is run, by:

      • Fixing the auto-reconnect logic

      or 

      • I can know a way to keep the connection alive between long processing task that makes the connection idle.

        The exact Python version used, with patch level:

      3.13.1 (tags/v3.13.1:0671451, Dec 3 2024, 19:06:28) [MSC v.1942 64 bit (AMD64)]

      The exact version of PyMongo used, with patch level:

      $ python -c "import pymongo; print(pymongo.version); print(pymongo.has_c())"

      4.12.1
      True{}

      Describe how MongoDB is set up. Local vs Hosted, version, topology, load balanced, etc.

       

      services:  local-common-agent-db:    image: mongo:8.0.12    command:      [        "--replSet",        "rs0",        "--bind_ip_all",        "--port",        "27017",        "--setParameter",        "transactionLifetimeLimitSeconds=900",      ]    sysctls:      net.ipv4.tcp_keepalive_time: 900      net.ipv4.tcp_keepalive_intvl: 5      net.ipv4.tcp_keepalive_probes: 3    ports:      - 27017:27017    healthcheck:      test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --port 27017 --quiet      interval: 5s      timeout: 30s      start_period: 0s      start_interval: 1s      retries: 30    volumes:      - "local-common-agent-db-data:/data/db"      - "local-common-agent-db-config:/data/configdb"
      volumes:  local-common-agent-db-data:  local-common-agent-db-config:
      
      

      I host it locally using docker dekstop in Windows 11. 

      docker version
      Client:
       Version:           28.4.0
       API version:       1.51
       Go version:        go1.24.7
       Git commit:        d8eb465
       Built:             Wed Sep  3 20:59:40 2025
       OS/Arch:           windows/amd64
       Context:           desktop-linuxServer: Docker Desktop 4.46.0 (204649)
       Engine:
        Version:          28.4.0
        API version:      1.51 (minimum version 1.24)
        Go version:       go1.24.7
        Git commit:       249d679
        Built:            Wed Sep  3 20:57:37 2025
        OS/Arch:          linux/amd64
        Experimental:     false
       containerd:
        Version:          1.7.27
        GitCommit:        05044ec0a9a75232cad458027ca83437aae3f4da
       runc:
        Version:          1.2.5
        GitCommit:        v1.2.5-0-g59923ef
       docker-init:
        Version:          0.19.0
        GitCommit:        de40ad0 

      This is the connection string that I use: 
      "mongodb://127.0.0.1:27017/?replicaSet=rs0"
       

      The operating system and version (e.g. Windows 7, OSX 10.8, ...)

      Windows 11

      Web framework or asynchronous network library used, if any, with version (e.g. Django 1.7, mod_wsgi 4.3.0, gevent 1.0.1, Tornado 4.0.2, ...)

      Security Vulnerabilities

      If you’ve identified a security vulnerability in a driver or any other MongoDB project, please report it according to the instructions here

            Assignee:
            Noah Stapp
            Reporter:
            Johanes Lee
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated: