[JAVA-4642] Replace synchronized block with ReentrantLock in BaseCluster Created: 11/Jun/22  Updated: 28/Oct/23  Resolved: 27/Jul/22

Status: Closed
Project: Java Driver
Component/s: Internal
Affects Version/s: None
Fix Version/s: 4.8.0

Type: Improvement Priority: Major - P3
Reporter: Jeffrey Yemin Assignee: Jeffrey Yemin
Resolution: Fixed Votes: 0
Labels: loom
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Attachments: Zip Archive JAVA4642.zip     Java Source File StructuredConcurrencyTest.java     File dump.json     File dump2.json    
Epic Link: Virtual Threads Support

 Description   

Testing JEP 425/428 using a JDK 19 early access build (build 26), I am able to create a scenario where all virtual threads are starved of carrier threads due to the fact that all available carrier threads are pinned by synchronized blocks.

Multiple virtual threads are observed (via jcmd) to be blocked waiting to enter the synchronized block in BaseCluster#withLock. Meanwhile, one thread is observed to be in that same block and, within that block, waiting to acquire the ReentrantLock in ConcurrentPool#lockUnfair. Another thread is also trying to acquire that same lock. No threads appear to actually hold that lock.

So what I think is happening is that neither of those two threads waiting to acquire the ReentrantLock are able to do so because all available carrier threads are pinned to other virtual threads waiting to enter the synchronized block in BaseCluster#withLock, and in particular the one that holds the BaseCluster lock is unable to unwind the stack in order to release it.

We can avoid this scenario by replacing the synchronized block in BaseCluster#withLock with a ReentrantLock.

Note: this scenario seems to contradict this part of JEP 425:

Pinning does not make an application incorrect, but it might hinder its scalability. If a virtual thread performs a blocking operation such as I/O or BlockingQueue.take() while it is pinned, then its carrier and the underlying OS thread are blocked for the duration of the operation. Frequent pinning for long durations can harm the scalability of an application by capturing carriers.

The scheduler does not compensate for pinning by expanding its parallelism. Instead, avoid frequent and long-lived pinning by revising synchronized blocks or methods that run frequently and guard potentially long I/O operations to use java.util.concurrent.locks.ReentrantLock instead. There is no need to replace synchronized blocks and methods that are used infrequently (e.g., only performed at startup) or that guard in-memory operations. As always, strive to keep locking policies simple and clear.

Specifically this part:

There is no need to replace synchronized blocks and methods that ... guard in-memory operations.

as this lock is guarding in-memory operations.

JEP 425 does state that:

In a future release, we may be able to remove the first limitation above (pinning inside synchronized).

But it's not clear if that improvement will come before or after JEP 425 becomes a non-preview feature. And even so, we would like the driver to work well with JEP 425/428 even while they are still in preview.



 Comments   
Comment by Githook User [ 27/Jul/22 ]

Author:

{'name': 'Jeff Yemin', 'email': 'jeff.yemin@mongodb.com', 'username': 'jyemin'}

Message: Replace synchronized with ReentrantLock (#984)

JAVA-4642
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/3ef3747d39fba98eb041f915e2377df7afd46c08

Comment by Valentin Kavalenka [ 23/Jun/22 ]

We got a confirmation that the behavior reported in this ticket is intended. This means we should just replace all synchronized methods and blocks with Java SE API locks (see JAVA-5105).

Comment by Valentin Kavalenka [ 23/Jun/22 ]

Asked a question in the loom-dev email list, waiting for response here: https://mail.openjdk.org/pipermail/loom-dev/2022-June/004750.html

Comment by Valentin Kavalenka [ 21/Jun/22 ]

I agree: trying to enter a synchronized block pins the virtual thread to its carrier. Since this arguably contradicts JEP 425, I created a reproducer that is not specific to the driver. Here is how it works:

  1. Instructs the JVM to have at most 2 carriers by setting the system property jdk.virtualThreadScheduler.parallelism (this is the only way to control the number of carriers).
  2. Schedules two tasks (taskA) that try to enter a synchronized block and then pin the virtual thread to the carrier with an infinite loop. If waiting in a queue to enter a synchronized block does not pin, then these tasks must not result in more than one pinned thread.
  3. Schedules one more task (taskB) that prints the description of its virtual thread and completes. Since we are allowed to have 2 carries, and at most 1 of them is expected to have a virtual thread pinned to it, we expect taskB to be executed eventually. However, this does not happen.

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
 
final class JAVA_4642 {
    public static void main(final String... args) throws InterruptedException {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            IntStream.range(0, 2).forEach(i -> executor.submit(JAVA_4642::taskA));
            Thread.sleep(Duration.ofSeconds(1));
            executor.submit(JAVA_4642::taskB);
            executor.awaitTermination(1000, TimeUnit.SECONDS);
        }
    }
 
    private static void taskA() {
        System.out.printf("taskA %s%n", Thread.currentThread());
        withSynchronized(JAVA_4642::pinWithInfinitLoop);
    }
 
    private static void taskB() {
        System.out.printf("taskB %s%n", Thread.currentThread());
    }
 
    private static void pinWithInfinitLoop() {
        while (true) {
        }
    }
 
    private static synchronized void withSynchronized(final Runnable action) {
        System.out.printf("%s has the lock%n", Thread.currentThread());
        action.run();
    }
 
    private static void sleep(final Duration d) {
        try {
            Thread.sleep(d);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }
}

If we run the application, we see

$ ~/Downloads/jdk-19.jdk/Contents/Home/bin/javac --release 19 --enable-preview ./JAVA_4642.java &&
 ~/Downloads/jdk-19.jdk/Contents/Home/bin/java --enable-preview -Djdk.virtualThreadScheduler.parallelism=2 JAVA_4642
 
taskA VirtualThread[#23]/runnable@ForkJoinPool-1-worker-2
taskA VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-2 has the lock

In the thread dump

jcmd $(jps | grep JAVA_4642 | cut -d " " -f1) Thread.dump_to_file -overwrite -format=json threads.json

we see that both carrier threads are executing a task:

{
  "container": "ForkJoinPool-1\/jdk.internal.vm.SharedThreadContainer@1863395d",
  "parent": "<root>",
  "owner": null,
  "threads": [
   {
     "tid": "22",
     "name": "ForkJoinPool-1-worker-1",
     "stack": [
        "java.base\/jdk.internal.vm.Continuation.run(Continuation.java:260)",
        "java.base\/java.lang.VirtualThread.runContinuation(VirtualThread.java:213)",
        "java.base\/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1423)",
        "java.base\/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)",
        "java.base\/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)",
        "java.base\/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)",
        "java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)",
        "java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)"
     ]
   },
   {
     "tid": "24",
     "name": "ForkJoinPool-1-worker-2",
     "stack": [
        "java.base\/jdk.internal.vm.Continuation.run(Continuation.java:257)",
        "java.base\/java.lang.VirtualThread.runContinuation(VirtualThread.java:213)",
        "java.base\/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1423)",
        "java.base\/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)",
        "java.base\/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)",
        "java.base\/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)",
        "java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)",
        "java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)"
     ]
   }
  ],
  "threadCount": "2"
}

both taskA are being executed (one is pinned by pinWithInfinitLoop, the other one is unfortunately pinned while trying to enter the synchronized block in withSynchronized)), and taskB ("tid": "23") has not even started because there are no carriers left available:

{
  "container": "java.util.concurrent.ThreadPerTaskExecutor@51e34497",
  "parent": "<root>",
  "owner": null,
  "threads": [
   {
     "tid": "21",
     "name": "",
     "stack": [
        "JAVA_4642.withSynchronized(JAVA_4642.java:39)",
        "JAVA_4642.taskA(JAVA_4642.java:26)",
        "java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)",
        "java.base\/java.util.concurrent.ThreadPerTaskExecutor$ThreadBoundFuture.run(ThreadPerTaskExecutor.java:352)",
        "java.base\/java.lang.VirtualThread.run(VirtualThread.java:287)",
        "java.base\/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:174)",
        "java.base\/jdk.internal.vm.Continuation.enter0(Continuation.java:327)",
        "java.base\/jdk.internal.vm.Continuation.enter(Continuation.java:320)"
     ]
   },
   {
     "tid": "23",
     "name": "",
     "stack": [
        "JAVA_4642.pinWithInfinitLoop(JAVA_4642.java:34)",
        "JAVA_4642.withSynchronized(JAVA_4642.java:40)",
        "JAVA_4642.taskA(JAVA_4642.java:26)",
        "java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)",
        "java.base\/java.util.concurrent.ThreadPerTaskExecutor$ThreadBoundFuture.run(ThreadPerTaskExecutor.java:352)",
        "java.base\/java.lang.VirtualThread.run(VirtualThread.java:287)",
        "java.base\/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:174)",
        "java.base\/jdk.internal.vm.Continuation.enter0(Continuation.java:327)",
        "java.base\/jdk.internal.vm.Continuation.enter(Continuation.java:320)"
     ]
   },
   {
     "tid": "25",
     "name": "",
     "stack": [
     ]
   }
  ],
  "threadCount": "3"
}

Comment by Jeffrey Yemin [ 11/Jun/22 ]

Note: I can't reproduce this on the master branch, but I can with 4.6.x branch. My hypothesis is that the changes to the buffer and session pools hide the issue somehow.

Generated at Thu Feb 08 09:02:36 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.