[JAVA-3907] AsyncQueryBatchCursor does not release ConnectionSource when closed Created: 10/Dec/20  Updated: 28/Oct/23  Resolved: 04/Feb/21

Status: Closed
Project: Java Driver
Component/s: Reactive Streams
Affects Version/s: None
Fix Version/s: 4.2.1

Type: Bug Priority: Major - P3
Reporter: Jeffrey Yemin Assignee: Valentin Kavalenka
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate
duplicates JAVA-3938 Change stream connections not returned Closed
is duplicated by JAVA-3938 Change stream connections not returned Closed
Issue split
split to JAVA-3961 Docs: Clarify cold publishers Closed
split to JAVA-3978 Introduce self-managing reference cou... Backlog
split to JAVA-3963 Document publisher properties in API ... Closed
split to JAVA-3964 Improve the documentation of the Refe... Closed
split to JAVA-3965 Document com.mongodb.ServerCursor wit... Closed
split to JAVA-3974 Async CryptBinding.retain() returns w... Closed
split to JAVA-3976 If Subscriber.onNext throws an except... Closed
split to JAVA-3972 Delete BatchCursor.tryNext and AsyncB... Closed
split to JAVA-3975 AsyncSessionBinding.retain and Sessio... Closed

 Description   

AsyncQueryBatchCursor#close does not release its reference to the ConnectionSource. This is not a problem if iteration completes successfully, as the getMore callback releases it when the server cursor is exhausted. But if iteration is interrupted due to error or the cursor is not fully iterated by the application, the reference will never get released. This can cause a leak, as a ConnectionSource can hold a reference to a resource (most likely a ClientSession.



 Comments   
Comment by Githook User [ 05/Apr/21 ]

Author:

{'name': 'Valentin Kovalenko', 'email': 'valentin.kovalenko@mongodb.com', 'username': 'stIncMale'}

Message: Backport to 4.2.x JAVA-4044/PR#689 and the changes it depends on (JAVA-3938 & JAVA-3907 / PR#661) (#691)

  • Regression test for change stream cancellation (#661)

Ensures that all sessions are returned to the pool

JAVA-3938 JAVA-3907

  • Guarantee that ChangeStreamPublisher for a collection completes after dropping the collection (#689)

Before the changes made within JAVA-3973,
ChangeStreamPublisher had been terminating with onError.
After the changes in JAVA-3973 neither onError nor onComplete is called,
but those changes allow us to terminated it with onComplete.
I could have specified only assertTerminalEvent()
without specifying assertNoErrors(), thus accepting either onComplete or onError
(the old behavior), but terminating with onComplete is nicer.

The approach with using startAtOperationTime to ensure that
a change stream is guaranteed to observe collection.drop()
works only if there is no leader re-election that results in
rolling back the delete operation from which the operationTime
was extracted. While such rollback can be prevented by using
the "majority" write concern, the common approach in driver tests
is to not use it for efficiency and tolerate a tiny chance of
experiencing a rollback.

JAVA-4044

Co-authored-by: Ross Lawley <ross.lawley@gmail.com>
Branch: 4.2.x
https://github.com/mongodb/mongo-java-driver/commit/77c8795eec482f909cfb658b46771b51c2ded7e0

Comment by Githook User [ 08/Feb/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Regression test for change stream cancellation (#661)

Ensures that all sessions are returned to the pool

JAVA-3938 JAVA-3907
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/67a3adec5fef87c88adbf2124e242293d8f6fea5

Comment by Githook User [ 04/Feb/21 ]

Author:

{'name': 'Valentin Kovalenko', 'email': 'valentin.kovalenko@mongodb.com', 'username': 'stIncMale'}

Message: Make implementations of AsyncAggregateResponseBatchCursor more responsive to close signals (#654)

1) Change implementations of AsyncAggregateResponseBatchCursor to make them more responsive to close signals

AsyncAggregateResponseBatchCursor wraps AsyncQueryBatchCursor.
AsyncQueryBatchCursor implements pending cancel functionality.
AsyncQueryBatchCursor may enter a pending close state as a result of calling close
concurrently with the object being in the pending operation state.
AsyncAggregateResponseBatchCursor does not need to duplicate the pending functionality,
in fact, it prevented AsyncQueryBatchCursor from receiving close signals while
being in the pending operation state.

2) Release resources in AsyncQueryBatchCursor

The changes made is this commit appear to be correct from the standpoint of tests,
but it is very hard for me to prove that by inspecting the code. There are two reasons:

  • the reference counting approach used in the code
  • the presence of multiple places in code where the instance field connectionSource
    is released, combined with the fact that it is not always retained in the constructor
    and must be released not more than once per the lifetime of an AsyncQueryBatchCursor object.

The following text expresses my thoughts about reference counting and is not directly
related to the changes made in this commit.
The current approach for reference counting in the driver code is what we may call
a release-early approach, similar to the one described in the Netty documentation
(https://github.com/netty/netty/wiki/Reference-counted-objects#who-destroys-it).
In this approach "the party that accesses a reference-counted object last
is also responsible for the destruction of that reference-counted object".
The problem with this approach is that it is not structured.

"Structured" reference counting.
The idea is to try and get our reference counting code closer to
following the structured programming approach
(https://en.wikipedia.org/wiki/Structured_programming)
while fixing a bug: AsyncQueryBatchCursor does not always release a connection
when it is not needed anymore.

In simple words, the following
is desirable: given a place in code where retain/release is called,
it should be simple to tell where the corresponding release/retain is called.

As a side note, structured programming applied to concurrent code
is called structured concurrency. This idea is realized in Kotlin:
https://kotlinlang.org/docs/reference/coroutines/basics.html#structured-concurrency,
https://elizarov.medium.com/structured-concurrency-722d765aa952
(the article is written by the author of Kotlin coroutines functionality).

As much as I would like us to use the structured approach, it is generally speaking
not possible to reconcile it with the release-early in the same codebase.
In other words, a complete refactoring may be needed to switch to the structured approach.
Following is an example demonstrating a situation when synchronous code that was written
with the release-early approach behaves differently when combined with the structured approach.

Consider the following legacy code in which we want to refactor releaseEarlyLegacyMethod
(the only code that will stay unchanged is the releaseEarlyUnchangedCode method):

void releaseEarlyLegacyMethod()

{ ReferenceCounted resource = pool.borrow();//count is 1 /* Since ReleaseEarlyLegacyConsumer is the last party using the resource, it must release the resource. * A reader cannot find the release call corresponding to the borrow call inside the releaseEarlyLegacyMethod method.*/ releaseEarlyUnchangedCode(new ReleaseEarlyLegacyOneTimeConsumer(resource)); }

class ReleaseEarlyLegacyOneTimeConsumer implements Runnable {
private final ReferenceCounted resource;

ReleaseEarlyLegacyOneTimeConsumer(ReferenceCounted resource)

{ this.resource = resource;//count is 1 }

public void run() {
try

{ System.out.println(resource.toString());//count is 1; use the resource }

finally

{//release the resource because ReleaseEarlyLegacyConsumer.accept is the last party accessing it /* Note that by looking at this release call, a reader cannot easily find the corresponding retain/borrow call. * The corresponding retain/borrow call is not even in this class.*/ resource.release();//count is 0; the resource returns to the pool }

}
}

void releaseEarlyUnchangedCode(Runnable action) {
action.run();
ReferenceCounted resource = pool.borrow();//count is 1; succeeds only if the single resource has been returned to the pool
try

{//use the resource System.out.println(resource.toString());//count is 1 }

finally

{//release the resource because releaseEarlyUnchangedCode is the last party accessing it resource.release();//count is 0; the resource is returned to the pool }

}

The pool.borrow call in releaseEarlyUnchangedCode works because the resource is returned to the pool before action.run() returns.
Here is the refactored code that uses structured reference counting:

void structuredNewMethod() {
ReferenceCounted resource = pool.borrow();//count is 1
//a reader can easily find the release call corresponding to the borrow call because it is in the finally block in this method
try

{ releaseEarlyUnchangedCode(new StructuredNewOneTimeConsumer(resource)); }

finally

{ resource.release();//count is 0; the resource returns to the pool }

}

class StructuredNewOneTimeConsumer implements Runnable {
private final ReferenceCounted resource;

StructuredNewOneTimeConsumer(ReferenceCounted resource)

{ this.resource = resource.retain();//count is 2; we store a new reference to the resource }

public void run() {
try

{ System.out.println(resource.toString());//count is 2; use the resource }

finally

{//release the resource because StructuredNewConsumer is not going to use it anymore /* The corresponding retain call is in the constructor of the class because resource is the instance field. * A reader may easily locate it.*/ resource.release();//count is 1 }

}
}

With this new code, the pool.borrow call in releaseEarlyUnchangedCode fails because the resource is not returned to the pool before action.run() returns.
It is only returned to the pool before structuredNewMethod returns.
Thereby, we have shown that the release-early and the structured approaches are not reconcilable.

This would not be a problem if the code was written following the structured approach from the beginning
because in that case, releaseEarlyUnchangedCode would have been written differently
(using either return-early or structured reference counting definitely affects the structure of the rest of the code).

JAVA-3907
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/a3ef19b171cdbfaa4677486fb43a2e8877d36d3a

Comment by Valentin Kavalenka [ 27/Jan/21 ]

Scenario 2)

Changing AsyncQueryBatchCursor.handleGetMoreQueryResult alone to react to pending cancel does not help because AsyncQueryBatchCursor.close is not called to set pending cancel when Subscription.cancel is called. This happens due to the pending operation/cancel logic in AsyncChangeStreamBatchCursor that wraps AsyncQueryBatchCursor. My next step is to understand the pending operation/cancel logic in AsyncChangeStreamBatchCursor and see what can be done about it.

Comment by Ross Lawley [ 26/Jan/21 ]

That sounds like a bug / unintentional change in behaviour. Added JAVA-3961 to track.

Comment by Jeffrey Yemin [ 26/Jan/21 ]

ross.lawley do you recall if it is intentional that change stream cursors establish themselves before any documents are requested? Did we think that was necessary in order to establish the resume token? I don't recall.

Comment by Valentin Kavalenka [ 26/Jan/21 ]

Improve the memo in the Quick Start Primer

Currently it states:

All Publishers returned from the API are cold, meaning that no I/O happens until they are subscribed to and the subscription makes a request. So just creating a Publisher won’t cause any network IO. It’s not until Subscription.request() is called that the driver executes the operation.

The actual behavior of the ChangeStreamPublisher is:

  • Create a server cursor after returning from the method Subscriber.onSubscribe. This does not require calling the method Subscription.request.
  • Issue getMore as a result of calling Subscription.request.

It may be more useful to say

  • All Publishers returned from the API are cold, meaning that no I/O happens until they are subscribed to and the subscription makes a -request. So just creating a Publisher won’t cause any network I/O. It’s not until Subscription.request() is called that the driver executes the operation. However, some Publishers, e.g., ChangeStreamPublisher, may need to do preliminary I/O operations as a result of calling Publisher.subscribe(Subscriber).

It may also be useful to duplicate the full memo (both the "cold" part and the "unicast" part) in the documentation of the com.mongodb.reactivestreams.client package (it contains our Publisher classes).

Update: see JAVA-3961, JAVA-3963.

Comment by Valentin Kavalenka [ 22/Jan/21 ]

The following example is inspired by the example in JAVA-3938 but uses a single connection in the pool and runs a single iteration (hangs virtually every time it is run):

Hang.java

package reactivestreams.primer;
 
import com.mongodb.MongoClientSettings;
import com.mongodb.ServerAddress;
import com.mongodb.client.model.InsertManyOptions;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.connection.AsynchronousSocketChannelStreamFactoryFactory;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterType;
import com.mongodb.diagnostics.logging.Logger;
import com.mongodb.diagnostics.logging.Loggers;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.sun.tools.javac.util.List;
import org.bson.BsonValue;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
 
import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
 
import static java.lang.String.format;
 
final class Hang {
    private static final Logger LOGGER = Loggers.getLogger(Hang.class.getSimpleName());
 
    public static void main(final String[] args) throws InterruptedException, IOException, ExecutionException {
        Duration waitTimeout = Duration.ofSeconds(4);
        int maxConnectionPoolSize = 1;
        try (com.mongodb.client.MongoClient writer = com.mongodb.client.MongoClients.create(
                mongoClientSettings("writer", maxConnectionPoolSize));
             MongoClient reader = MongoClients.create(mongoClientSettings("reader", maxConnectionPoolSize))) {
            String dbName = "handTest";
            String collectionName = "initiallyEmpty";
            writer.getDatabase(dbName).drop();
            com.mongodb.client.MongoCollection<Document> writerCollection = writer
                    .getDatabase(dbName)
                    .getCollection(collectionName);
            MongoCollection<Document> readerCollection = reader
                    .getDatabase(dbName)
                    .getCollection(collectionName);
            for (int i = 0; i < 1; i++) {
                int idx = i;
                LOGGER.info(format("iteration#%d", idx));
                ChangeStreamPublisher<Document> changeStreamPublisher = readerCollection.watch()
                        .batchSize(2)
                        .maxAwaitTime(2, TimeUnit.SECONDS);
                CompletableFuture<Subscription> watchSubscription = new CompletableFuture<>();
                changeStreamPublisher.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
                    private Subscription s;
 
                    @Override
                    public void onSubscribe(final Subscription s) {
                        LOGGER.info(format("watch.onSubscribe#%d", idx));
                        this.s = s;
                        watchSubscription.complete(s);
                        s.request(1);
                    }
 
                    @Override
                    public void onNext(final ChangeStreamDocument<Document> doc) {
                        LOGGER.info(format("watch.onNext#%d: %s", idx, doc.getDocumentKey()));
                        s.request(1);
                    }
 
                    @Override
                    public void onError(final Throwable t) {
                        LOGGER.info(format("watch.onError#%d", idx), t);
                    }
 
                    @Override
                    public void onComplete() {
                        LOGGER.info(format("watch.onComplete#%d", idx));
                    }
                });
                Thread.sleep(300);
                CountDownLatch latchCountDone = subscribeCount(readerCollection, idx);
                String countLatchCondition = format("count.onComplete/onError#%d", idx);
                {//count must be hanging
                    if (await(latchCountDone, waitTimeout, countLatchCondition)) {
                        LOGGER.error(format("waiting for %s unexpectedly succeeded", countLatchCondition));
                        return;
                    } else {
                        LOGGER.info(format("waiting for %s correctly failed", countLatchCondition));
                    }
                }
                {//cancel watch
                    watchSubscription.get().cancel();
                    LOGGER.info(format("watch#%d was cancelled", idx));
                }
                {//count must complete
                    final String condition = countLatchCondition + " after cancelling watching";
                    if (await(latchCountDone, waitTimeout, condition)) {
                        LOGGER.info(format("waiting for %s correctly succeeded", condition));
                        return;
                    } else {
                        LOGGER.error(format("waiting for %s incorrectly but expectedly failed", condition));
                    }
                }
                insert(writerCollection, List.of(Document.parse("{}"), Document.parse("{}"), Document.parse("{}")));
                {//count must complete
                    final String condition = countLatchCondition + " after inserting docs";
                    if (await(latchCountDone, waitTimeout, condition)) {
                        LOGGER.info("demonstrated expected incorrect behavior");
                    } else {
                        LOGGER.error("things are worse than expected");
                    }
                }
            }
            sleep(Duration.ofSeconds(6), "sleeping to see what happens next");
        }
    }
 
    private static void insert(final com.mongodb.client.MongoCollection<Document> writerCollection, final List<? extends Document> docs) {
        Map<Integer, BsonValue> insertedIds = writerCollection.insertMany(docs, new InsertManyOptions().ordered(false))
                .getInsertedIds();
        LOGGER.info(format("inserted %s", insertedIds));
    }
 
    private static CountDownLatch subscribeCount(final MongoCollection<Document> readerCollection, final int idx) {
        CountDownLatch latch = new CountDownLatch(1);
        readerCollection.countDocuments().subscribe(new Subscriber<Long>() {
            private Subscription s;
 
            @Override
            public void onSubscribe(final Subscription s) {
                LOGGER.info(format("count.onSubscribe#%d", idx));
                this.s = s;
                s.request(1);
            }
 
            @Override
            public void onNext(final Long count) {
                LOGGER.info(format("count.onNext#%d: %d", idx, count));
                s.cancel();
            }
 
            @Override
            public void onError(final Throwable t) {
                try {
                    LOGGER.info(format("count.onError#%d", idx), t);
                } finally {
                    latch.countDown();
                }
            }
 
            @Override
            public void onComplete() {
                try {
                    LOGGER.info(format("count.onComplete#%d", idx));
                } finally {
                    latch.countDown();
                }
            }
        });
        return latch;
    }
 
    private static boolean await(final CountDownLatch latch, @Nullable final Duration duration, final String condition)
            throws InterruptedException {
        long startNanos = System.nanoTime();
        long durationNanos = duration == null ? Long.MAX_VALUE : duration.toNanos();
        long partNanos = Math.min(TimeUnit.SECONDS.toNanos(1), durationNanos);
        boolean success = false;
        long leftNanos = durationNanos;
        while (!success && leftNanos > 0) {
            if (leftNanos != durationNanos) {
                LOGGER.info(format("waiting for %s%s",
                        condition,
                        duration == null
                                ? ""
                                : format(" (%.1fs left)",
                                (double)TimeUnit.NANOSECONDS.toMillis(leftNanos) / TimeUnit.SECONDS.toMillis(1))));
            }
            success = latch.await(Math.min(partNanos, leftNanos), TimeUnit.NANOSECONDS);
            leftNanos = durationNanos - (System.nanoTime() - startNanos);
        }
        LOGGER.info(format("waiting for %s (%s)", condition, success ? "success" : "failure"));
        return success;
    }
 
    private static void sleep(final Duration duration, final String description) throws InterruptedException {
        long startNanos = System.nanoTime();
        long durationNanos = duration.toNanos();
        long partNanos = Math.min(TimeUnit.SECONDS.toNanos(1), durationNanos);
        long leftNanos = durationNanos;
        while (leftNanos > 0) {
            if (leftNanos != durationNanos) {
                LOGGER.info(format("%s%s",
                        description,
                        format(" (%.1fs left)", (double)TimeUnit.NANOSECONDS.toMillis(leftNanos) / TimeUnit.SECONDS.toMillis(1))));
            }
            Thread.sleep(TimeUnit.NANOSECONDS.toMillis(Math.min(partNanos, leftNanos)));
            leftNanos = durationNanos - (System.nanoTime() - startNanos);
        }
    }
 
    private static MongoClientSettings mongoClientSettings(String clientName, final int maxPoolSize) throws IOException {
        ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
        AtomicLong threadCounter = new AtomicLong(1);
        ThreadFactory threadFactory = r -> {
            Thread thread = defaultThreadFactory.newThread(r);
            thread.setDaemon(true);
            thread.setName(format("%s-%d", clientName, threadCounter.getAndIncrement()));
            thread.setUncaughtExceptionHandler((t, e) -> LOGGER.error("Uncaught throwable", e));
            return thread;
        };
        return MongoClientSettings.builder()
                .applicationName(clientName)
                .applyToClusterSettings(builder -> builder
                        .mode(ClusterConnectionMode.MULTIPLE)
                        .requiredClusterType(ClusterType.REPLICA_SET)
                        .hosts(List.of(
                                new ServerAddress(ServerAddress.defaultHost(), ServerAddress.defaultPort()))))
                .applyToSocketSettings(builder -> builder
                        .connectTimeout(1, TimeUnit.DAYS)
                        .readTimeout(1, TimeUnit.DAYS))
                .applyToConnectionPoolSettings(builder -> builder
                        .maxSize(maxPoolSize)
                        .maxWaitTime(1, TimeUnit.DAYS))
                .streamFactoryFactory(AsynchronousSocketChannelStreamFactoryFactory.builder()
                        .group(AsynchronousChannelGroup.withThreadPool(
                                Executors.newCachedThreadPool(threadFactory)))
                        .build())
                .build();
    }
}

The relevant shortened output:

15:04:56.877 [main] INFO  org.mongodb.driver.Hang - iteration#0
15:04:57.031 [main] INFO  org.mongodb.driver.Hang - watch.onSubscribe#0
15:04:57.350 [main] INFO  org.mongodb.driver.Hang - count.onSubscribe#0
15:05:01.356 [main] INFO  org.mongodb.driver.Hang - waiting for count.onComplete/onError#0 correctly failed
15:05:01.356 [main] INFO  org.mongodb.driver.Hang - watch#0 was cancelled
15:05:05.360 [main] ERROR org.mongodb.driver.Hang - waiting for count.onComplete/onError#0 after cancelling watching incorrectly but expectedly failed
15:05:05.518 [main] INFO  org.mongodb.driver.Hang - inserted {0=BsonObjectId{value=600def114d612f20e9869ee4}, 1=BsonObjectId{value=600def114d612f20e9869ee5}, 2=BsonObjectId{value=600def114d612f20e9869ee6}}
15:05:05.698 [reader-2] INFO  org.mongodb.driver.Hang - count.onNext#0: 3
15:05:05.701 [reader-2] INFO  org.mongodb.driver.Hang - count.onComplete#0
15:05:05.701 [main] INFO  org.mongodb.driver.Hang - waiting for count.onComplete/onError#0 after inserting docs (success)
15:05:05.701 [main] INFO  org.mongodb.driver.Hang - demonstrated expected incorrect behavior

Writing it and playing with it was useful for my understanding of the problem and the expected behavior.

There are three scenarios I tried:
1) [OK] The getMore command fails due to read timeout.
Behavior: The driver closes the corresponding connection, logs the timeout exception (there is no other way for a user to know this happened), issues the killCursors command in a different connection to kill the original server cursor, issues another aggregate command with the $changeStream pipeline stage, continues iterating by calling getMore.
The subscription and the subscriber work as if nothing happened.
2) [driver bug] JAVA-3938: Subscription.cancel is called.
Behavior: The corresponding connection is not released and the driver continues calling getMore until some results are returned. After receiving at least one change, the driver issues the killCursors command and releases the connection. Neither Subscriber.onNext nor Subscriber.onError are called.
3) JAVA-3976 Subscriber.onNext throws an exception before/after calling Subscription.request.
Note that the connection is always released before Subscriber.onNext is called.
Behavior: The Reactor logs the exception in some cases (e.g., OOM is logged, RuntimeException is not), neither Subscriber.onNext nor Subscriber.onError are called (this is fine according to Subscriber rule 13), the server cursor is not killed by the driver (this is a problem).

Scenario 2)

Seems to be caused by JAVA-3487 (although, this means neither that undoing the changes would necessary fix the behavior, nor that the changes should be simply undone). Currently the driver actually acts on the called Subscription.cancel only when there is no pending operation on the cursor (e.g., when there is no pending getMore).

The correct behaviour is to cancel the pending operation and release all resources (I was unable to find any clear requirement to call Subscriber.onComplete in this scenario, and I interpret this as that calling the method is optional; relevant links: Reactor Reference Guide - Flux, org.reactivestreams.Subscriber and tangentially java.util.concurrent.Flow.Subscription.cancel - "subscription need not ever receive an onComplete or onError signal").

Cancelling the pending operation boils down to cancelling pending com.mongodb.connection.Stream.readAsync, which can be done only by calling Stream.close. Therefore, I think the following handling of Subscription.cancel is how the driver should ideally behave (it is possible that at this point I am missing some details, but I believe the outline is correct):

  1. Synchronously close the connection (not return to the pool, but actually close), which should in turn result in calling Stream.close.
  2. Asynchronously borrow another connection and call killCursors to kill the related server cursor.

None of the reactive driver operations currently behave as described above, instead they all handle cancel signals by remembering them (pending cancel) and acting later when a pending action related to the corresponding subscription returns. We will not refactor the implementation to use the aforementioned approach, instead we will only fix the AsyncQueryBatchCursor, which does not act on the cancel signal even when a pending getMore returns1, unless it produces any results (see AsyncQueryBatchCursor.handleGetMoreQueryResult).

Further discussion of the bug we have in this scenario and its solution continues in the comments below.

Scenario 3)

We need Reactor to notify the driver when it decides that a subscription is terminated as a result of Subscriber.onNext throwing an exception. It appears that Flux.onError* methods (and maybe Flux.doOnTerminate, Flux.doOnError) are the levers we need to pull for achieving the desired behavior. However, I did not have any success as a result of playing with using these methods in BatchCursorPublisher.subscribe. Currently this problem looks like a rabbit hole that deserves its own bug report.


1 One may also imagine releasing the borrowed connection when getMore returns without any results and there is no pending cancel event, to give other tasks a chance to use the connection. We are not going to do such a change.

Comment by Ross Lawley [ 15/Jan/21 ]

I think JAVA-3938 highlights this issue and is a duplicate. The scenario is where the subscription is cancelled when a getMore is in flight. The connection won't be returned until the getMore returns.

Generated at Thu Feb 08 09:00:44 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.