|
Accorind to the client session specification:
ClientSession instances are not thread safe or fork safe. They can only be used by one thread or process at a time.
Subscribing to multiple publishers in a loop is against the specification and results in non-determined behaviour. The initial error is one such example of this. Further more the later comment doesn't actually conform to specification either.
The only correct way to use ClientSessions is to do in a way that is one at a time - eg only after the previous operation with the session is complete can the next operation start.
JAVA-4538 tracks the improvements to the API documentation and a documentation ticket has also been filed to provide examples and warnings in the main documentation.
Closing this ticket as Works as Designed - as the example incorrectly uses the Client Session.
|
|
Please see final comment.
I've updated the example in ClientSessionTest.java and LatchedSubscriber.java . Using a CommandListener outputted more debugging:
[main] INFO ClientSessionTest - Started transaction:: txn: 497
|
[main] INFO ClientSessionTest - Inserting docs
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: true : ConnId: connectionId{localValue:12, serverValue:206} : RequestId: 5464
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:10, serverValue:205} : RequestId: 5465
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:14, serverValue:207} : RequestId: 5466
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:7, serverValue:203} : RequestId: 5467
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:46, serverValue:209} : RequestId: 5468
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:8, serverValue:204} : RequestId: 5469
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:24, serverValue:208} : RequestId: 5470
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:12, serverValue:206} : RequestId: 5471
|
[AsyncGetter-2-thread-1] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:7, serverValue:203} : RequestId: 5472
|
[Thread-18] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:52, serverValue:210}] to localhost:27017
|
[Thread-18] INFO ClientSessionTest - Command Started: txtNumber: 497 : startTransaction: false : ConnId: connectionId{localValue:52, serverValue:210} : RequestId: 5474
|
[Thread-8] WARN LatchedSubscriber - Publisher failed
|
So the conclusion is more to do with how startTransaction is added - chronologically it all looks correct from the logs, but the approach below subtly hides a race:
clientSession.startTransaction();
|
LOGGER.info("Started transaction:: txn: {}", clientSession.getServerSession().getTransactionNumber());
|
List<Publisher<?>> publisherList = new ArrayList<>();
|
for (int i = 0; i < BATCH_SIZE; i++) {
|
publisherList.add(collection.insertOne(clientSession, Document.parse("{\"test\": \"test\"}")));
|
}
|
LOGGER.info(" Inserting docs");
|
waitForPublishers(publisherList.toArray(new Publisher<?>[0])); // <<< The race is hidden here <<<
|
From the transaction specification:
Behavior of the startTransaction field
The first command within a multi-statement transaction MUST include startTransaction:true. Subsequent commands MUST NOT include the startTransaction field.
By subscribing to all insert publishers the first command to be issued to the server is not guaranteed - as the network latency between threads could be racy.
The lack of blocking on startTransaction makes sense for synchronous drivers but hides an issue with asynchronous drivers where transactions contain multiple operations where ordering of operations is not important.
Changing the example code to block on the first insert solves the issue:
// Block on first insert...
|
waitForPublisher(collection.insertOne(clientSession, Document.parse("{\"test\": \"test\"}")), (x) ->{});
|
List<Publisher<?>> publisherList = new ArrayList<>();
|
for (int i = 1; i < BATCH_SIZE; i++) {
|
publisherList.add(collection.insertOne(clientSession, Document.parse("{\"test\": \"test\"}")));
|
}
|
waitForPublishers(publisherList.toArray(new Publisher<?>[0]));
|
|