[SERVER-53500] Investigate making nested futures equivalent to chained continuations Created: 23/Dec/20  Updated: 06/Dec/22

Status: Backlog
Project: Core Server
Component/s: Internal Code
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Major - P3
Reporter: Max Hirschhorn Assignee: Backlog - Service Architecture
Resolution: Unresolved Votes: 0
Labels: sa-remove-fv-backlog-22
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
related to SERVER-54119 Destroy SharedStateBase as we unfold ... Closed
related to SERVER-54408 Rewrite AsyncTry-until to not use fut... Closed
Assigned Teams:
Service Arch
Participants:
Story Points: 7

 Description   

It is somewhat easy to misuse mongo::Futures by having them get chained through nesting (e.g. a future returned by another function) rather than chained at the top-level. Nesting futures can lead to hitting the kMaxDepth invariant (--dbg=on only) and it isn't immediately obvious to consumers of futures why a different syntax could have significantly different behavior.

diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h
index ff846fabc3..d1891e8216 100644
--- a/src/mongo/util/future_util.h
+++ b/src/mongo/util/future_util.h
@@ -263,15 +263,17 @@ private:
             // If the request is already canceled, don't run anything.
             if (cancelToken.isCanceled())
                 return ExecutorFuture<ReturnType>(executor, asyncTryCanceledStatus());
-            auto future = ExecutorFuture<void>(executor).then(executeLoopBody);
 
-            return std::move(future).onCompletion(
-                [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) {
-                    if (shouldStopIteration(s))
-                        return ExecutorFuture<ReturnType>(executor, std::move(s));
-
-                    return run();
-                });
+            return ExecutorFuture<void>(executor).then([this, self = this->shared_from_this()] {
+                return ExecutorFuture<void>(executor)
+                    .then([this, self = this->shared_from_this()] { return executeLoopBody(); })
+                    .onCompletion(
+                        [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) {
+                            if (shouldStopIteration(s))
+                                return ExecutorFuture<ReturnType>(executor, std::move(s));
+                            return run();
+                        });
+            });
         }
 
         std::shared_ptr<executor::TaskExecutor> executor;
diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp
index e9878d21e8..731b4505f6 100644
--- a/src/mongo/util/future_util_test.cpp
+++ b/src/mongo/util/future_util_test.cpp
@@ -35,6 +35,7 @@
 #include "mongo/executor/thread_pool_task_executor_test_fixture.h"
 #include "mongo/unittest/barrier.h"
 #include "mongo/unittest/death_test.h"
+#include "mongo/util/concurrency/thread_pool.h"
 #include "mongo/util/future_util.h"
 
 namespace mongo {
@@ -46,7 +47,10 @@ public:
         auto network = std::make_unique<executor::NetworkInterfaceMock>();
         _network = network.get();
 
-        _executor = makeSharedThreadPoolTestExecutor(std::move(network));
+        ThreadPool::Options threadPoolOptions;
+        threadPoolOptions.maxThreads = 10;
+        _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+            std::make_unique<ThreadPool>(std::move(threadPoolOptions)), std::move(network));
         _executor->startup();
     }
 
@@ -109,6 +113,22 @@ TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) {
     ASSERT_EQ(i, numLoops);
 }
 
+TEST_F(AsyncTryUntilTest, LoopReturnsFutureAndExecutesUntilConditionIsTrue) {
+    const int numLoops = 3000;
+    auto i = 0;
+    auto resultFut = AsyncTry([&] {
+                         return makeReadyFutureWith([&] {
+                             ++i;
+                             return i;
+                         });
+                     })
+                         .until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
+                         .on(executor(), CancelationToken::uncancelable());
+    resultFut.wait();
+
+    ASSERT_EQ(i, numLoops);
+}
+
 TEST_F(AsyncTryUntilTest, LoopDoesNotRespectConstDelayIfConditionIsAlreadyTrue) {
     auto i = 0;
     auto resultFut = AsyncTry([&] { ++i; })

Acceptance criteria:

  • We have understood and documented the conditions under which the max depth invariant can be hit, including how the isJustForContinuation optimization works and what the execution graph looks like in different cases
  • Outline list of proposed optimizations for different future chains/tree structures such that we can avoid hitting this invariant, or if not possible, a proposal to make it safe to remove the invariant. Specifically, we should be able to do nested async loops in a standard way without hitting this invariant.


 Comments   
Comment by Billy Donahue [ 31/Jan/21 ]

I wrote a more basic repro here.

https://github.com/mongodb/mongo/compare/master...BillyDonahue:SERVER-53500_repro

This doesn't involve any classes other than Future<void>, and is not multithreaded, so it should be simpler to diagnose.

Comment by Billy Donahue [ 30/Jan/21 ]

What is the purpose of the patch in the description? I applied it and didn't see any test failures.

... Update:

For the record, on Mac at least, I needed to have --dbg=on AND --opt=on to repro this. Without --opt=on it doesn't fail.

Comment by Matthew Saltz (Inactive) [ 28/Jan/21 ]

Also after discussion with others, I don't think it's possible to make them equivalent exactly, since they aren't totally equivalent. For instance, you could have branching that allows you to decide which of two nested future chains to use. We should consider the Acceptance Criteria here more indicative of what needs to be done that the ticket title.

Comment by Matthew Saltz (Inactive) [ 28/Jan/21 ]

FWIW this invariant popped up again in a recent ticket. I think we need to prioritize this. I'm concerned that there may be edge cases that could cause serious problems in production that will be even harder to diagnose because that invariant won't be enabled.

Comment by Max Hirschhorn [ 04/Jan/21 ]

do you know if it's required to have more than one thread for the repro to work?

The different repro steps mentioned on this ticket passed locally for me with maxThreads=1.

Comment by Matthew Saltz (Inactive) [ 04/Jan/21 ]

Thanks for all the detailed repros! By the way - do you know if it's required to have more than one thread for the repro to work? If you don't know off the top of your head don't worry about going to try it - whoever ends up looking at this ticket can give it a shot.

Comment by Max Hirschhorn [ 30/Dec/20 ]

TryUntilLoopWithDelay chains the recursive call to run() onto the future returned by sleepFor() and so it can also hit the kMaxDepth invariant when the delay is short enough.

diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp
index e9878d21e8..2c46baaa2e 100644
--- a/src/mongo/util/future_util_test.cpp
+++ b/src/mongo/util/future_util_test.cpp
@@ -35,6 +35,7 @@
 #include "mongo/executor/thread_pool_task_executor_test_fixture.h"
 #include "mongo/unittest/barrier.h"
 #include "mongo/unittest/death_test.h"
+#include "mongo/util/concurrency/thread_pool.h"
 #include "mongo/util/future_util.h"
 
 namespace mongo {
@@ -46,7 +47,10 @@ public:
         auto network = std::make_unique<executor::NetworkInterfaceMock>();
         _network = network.get();
 
-        _executor = makeSharedThreadPoolTestExecutor(std::move(network));
+        ThreadPool::Options threadPoolOptions;
+        threadPoolOptions.maxThreads = 10;
+        _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+            std::make_unique<ThreadPool>(std::move(threadPoolOptions)), std::move(network));
         _executor->startup();
     }
 
@@ -96,13 +100,14 @@ TEST_F(AsyncTryUntilTest, LoopExecutesOnceWithAlwaysTrueCondition) {
 }
 
 TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) {
-    const int numLoops = 3;
+    const int numLoops = 3000;
     auto i = 0;
     auto resultFut = AsyncTry([&] {
                          ++i;
                          return i;
                      })
                          .until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
+                         .withDelayBetweenIterations(Milliseconds(0))
                          .on(executor(), CancelationToken::uncancelable());
     resultFut.wait();

Comment by Max Hirschhorn [ 23/Dec/20 ]

Note that with the current version of future_util.h, nesting AsyncTrys will also trigger the kMaxDepth invariant.

diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp
index e9878d21e8..3d0f057785 100644
--- a/src/mongo/util/future_util_test.cpp
+++ b/src/mongo/util/future_util_test.cpp
@@ -35,6 +35,7 @@
 #include "mongo/executor/thread_pool_task_executor_test_fixture.h"
 #include "mongo/unittest/barrier.h"
 #include "mongo/unittest/death_test.h"
+#include "mongo/util/concurrency/thread_pool.h"
 #include "mongo/util/future_util.h"
 
 namespace mongo {
@@ -46,7 +47,10 @@ public:
         auto network = std::make_unique<executor::NetworkInterfaceMock>();
         _network = network.get();
 
-        _executor = makeSharedThreadPoolTestExecutor(std::move(network));
+        ThreadPool::Options threadPoolOptions;
+        threadPoolOptions.maxThreads = 10;
+        _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+            std::make_unique<ThreadPool>(std::move(threadPoolOptions)), std::move(network));
         _executor->startup();
     }
 
@@ -96,14 +100,19 @@ TEST_F(AsyncTryUntilTest, LoopExecutesOnceWithAlwaysTrueCondition) {
 }
 
 TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) {
-    const int numLoops = 3;
+    const int numLoops = 3000;
     auto i = 0;
-    auto resultFut = AsyncTry([&] {
-                         ++i;
-                         return i;
-                     })
-                         .until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
-                         .on(executor(), CancelationToken::uncancelable());
+    auto resultFut =
+        AsyncTry([&] {
+            return AsyncTry([&] {
+                       ++i;
+                       return i;
+                   })
+                .until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
+                .on(executor(), CancelationToken::uncancelable());
+        })
+            .until([&](StatusWith<int> swInt) { return true; })
+            .on(executor(), CancelationToken::uncancelable());
     resultFut.wait();
 
     ASSERT_EQ(i, numLoops);

Generated at Thu Feb 08 05:31:06 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.