Details
-
Bug
-
Resolution: Works as Designed
-
Major - P3
-
None
-
2.10.2
-
None
-
None
-
(copied to CRM)
Description
When awaiting multiple BulkWriteAsync tasks using Task.WhenAll wrapped in a transaction, if no other operations have been performed on the MongoClient the transaction will fail with: Only servers in a sharded cluster can start a new transaction at the active transaction number
For example, the following would fail:
// ...
|
// _db.RunCommand((Command<BsonDocument>)"{ping:1}");
|
using (var session = await _client.StartSessionAsync().ConfigureAwait(false)) |
{
|
session.StartTransaction();
|
var writeModel1 = new List<WriteModel<BsonDocument>>(); |
writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
var writeModel2 = new List<WriteModel<BsonDocument>>(); |
writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
|
|
var task1 = _collection1.BulkWriteAsync(session, writeModel1); |
var task2 = _collection2.BulkWriteAsync(session, writeModel2); |
await Task.WhenAll(task1, task2).ConfigureAwait(false); |
await session.CommitTransactionAsync().ConfigureAwait(false); |
}
|
// ... |
but un-commenting the db operation would cause the code to run successfully.
--------------------
Reproduction:
mlaunch init --replicaset --nodes 3
|
mongo --eval 'db.getSiblingDB("test").collection1.insert({})'
|
mongo --eval 'db.getSiblingDB("test").collection2.insert({})'
|
using Microsoft.VisualStudio.TestTools.UnitTesting; |
using MongoDB.Bson; |
using MongoDB.Driver; |
using Shouldly; |
using System; |
using System.Collections.Generic; |
using System.Diagnostics; |
using System.Threading.Tasks; |
|
|
namespace MongoDBCSharpTests |
{
|
[TestClass]
|
public class UnitTest2 |
{
|
private const string connString = "mongodb://localhost:27017/test?replicaSet=replset"; |
private IMongoDatabase _db; |
private MongoClient _client; |
private IMongoCollection<BsonDocument> _collection1; |
private IMongoCollection<BsonDocument> _collection2; |
|
|
[TestInitialize]
|
public void Setup() |
{
|
_client = new MongoClient(connString); |
_db = _client.GetDatabase("test"); |
_collection1 = _db.GetCollection<BsonDocument>("collection1"); |
_collection2 = _db.GetCollection<BsonDocument>("collection2"); |
}
|
|
|
public async Task RunTest() |
{
|
using (var session = await _client.StartSessionAsync().ConfigureAwait(false)) |
{
|
session.StartTransaction();
|
|
|
try |
{
|
var writeModel1 = new List<WriteModel<BsonDocument>>(); |
writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
var writeModel2 = new List<WriteModel<BsonDocument>>(); |
writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } })); |
|
|
var task1 = _collection1.BulkWriteAsync(session, writeModel1); |
var task2 = _collection2.BulkWriteAsync(session, writeModel2); |
await Task.WhenAll(task1, task2).ConfigureAwait(false); |
await session.CommitTransactionAsync().ConfigureAwait(false); |
Debug.WriteLine("Committed"); |
}
|
catch (Exception ex) |
{
|
await session.AbortTransactionAsync().ConfigureAwait(false); |
Debug.WriteLine("Error... Aborting"); |
Debug.WriteLine(ex.Message);
|
throw ex; |
}
|
|
|
Debug.WriteLine("Completed"); |
}
|
}
|
|
|
[TestMethod]
|
public void ShouldFailWithMongoCommandException() |
{
|
Should.Throw<MongoCommandException>(() => RunTest()).Message.ShouldContain("Only servers in a sharded cluster can start a new transaction at the active transaction number"); |
}
|
|
|
[TestMethod]
|
public void ShouldSucceed() |
{
|
_db.RunCommand((Command<BsonDocument>)"{ping:1}"); |
Should.NotThrow(() => RunTest());
|
}
|
}
|
}
|