[JAVA-3938] Change stream connections not returned Created: 14/Jan/21  Updated: 05/Apr/21  Resolved: 11/Feb/21

Status: Closed
Project: Java Driver
Component/s: Async, Change Streams
Affects Version/s: 4.1.0
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Ross Lawley Assignee: Ross Lawley
Resolution: Duplicate Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate
duplicates JAVA-3907 AsyncQueryBatchCursor does not releas... Closed
is duplicated by JAVA-3907 AsyncQueryBatchCursor does not releas... Closed
Related
is related to JAVA-3487 com.mongodb.MongoException: state sho... Closed

 Description   

Where the max connections is 100 the following code will fail to complete:

    public static void main(final String[] args) {
        MongoClient mongoClient = MongoClients.create("mongodb://localhost");
        MongoDatabase db = mongoClient.getDatabase("import");
        MongoCollection<Document> gameEntityCollection = db.getCollection("gameEnitites");
        for (int i = 0; i < 200; i++) {
            final int subscribeNumber = i;
            System.out.println("Subscribe number " + subscribeNumber);
            gameEntityCollection.watch().subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(1);
                    new Thread(() -> {
                        try {
                            Thread.sleep(1000);
                            s.cancel();
                            System.out.println("Cancelled Subscribe number " + subscribeNumber);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }).start();
                }
 
                @Override
                public void onNext(ChangeStreamDocument<Document> t) {
                }
 
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
 
                @Override
                public void onComplete() {
                    System.out.println("Completed Subscribe number " + subscribeNumber);
                }
            });
            CountDownLatch cd = new CountDownLatch(1);
            gameEntityCollection.countDocuments().subscribe(new Subscriber<Long>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }
 
                @Override
                public void onNext(Long aLong) {
                    System.out.println("Subscribe number " + subscribeNumber + " collection size: " + aLong);
                }
 
                @Override
                public void onError(Throwable t) {
                }
 
                @Override
                public void onComplete() {
                    cd.countDown();
                }
            });
            cd.await();
        }
    }



 Comments   
Comment by Githook User [ 05/Apr/21 ]

Author:

{'name': 'Valentin Kovalenko', 'email': 'valentin.kovalenko@mongodb.com', 'username': 'stIncMale'}

Message: Backport to 4.2.x JAVA-4044/PR#689 and the changes it depends on (JAVA-3938 & JAVA-3907 / PR#661) (#691)

  • Regression test for change stream cancellation (#661)

Ensures that all sessions are returned to the pool

JAVA-3938 JAVA-3907

  • Guarantee that ChangeStreamPublisher for a collection completes after dropping the collection (#689)

Before the changes made within JAVA-3973,
ChangeStreamPublisher had been terminating with onError.
After the changes in JAVA-3973 neither onError nor onComplete is called,
but those changes allow us to terminated it with onComplete.
I could have specified only assertTerminalEvent()
without specifying assertNoErrors(), thus accepting either onComplete or onError
(the old behavior), but terminating with onComplete is nicer.

The approach with using startAtOperationTime to ensure that
a change stream is guaranteed to observe collection.drop()
works only if there is no leader re-election that results in
rolling back the delete operation from which the operationTime
was extracted. While such rollback can be prevented by using
the "majority" write concern, the common approach in driver tests
is to not use it for efficiency and tolerate a tiny chance of
experiencing a rollback.

JAVA-4044

Co-authored-by: Ross Lawley <ross.lawley@gmail.com>
Branch: 4.2.x
https://github.com/mongodb/mongo-java-driver/commit/77c8795eec482f909cfb658b46771b51c2ded7e0

Comment by Githook User [ 08/Feb/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Regression test for change stream cancellation (#661)

Ensures that all sessions are returned to the pool

JAVA-3938 JAVA-3907
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/67a3adec5fef87c88adbf2124e242293d8f6fea5

Comment by Ross Lawley [ 05/Feb/21 ]

Adding regression test

Comment by Ross Lawley [ 15/Jan/21 ]

The issue here is on a tailable cursor getMore waits for a result before returning, so the cancellation is scheduled but it requires the getMore to return before its processed.

If the getMore returns the pending close will happen and the connection is released.

Its probably best that the fix for JAVA-3907 is done alongside the fix for this ticket.

Comment by Ross Lawley [ 14/Jan/21 ]

Note: This could be related to JAVA-3850 and may be fixed in master.

Generated at Thu Feb 08 09:00:49 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.