[CSHARP-2974] Task.WhenAll fails with transaction error when awaiting multiple BulkWrite operations Created: 20/Feb/20  Updated: 27/Oct/23  Resolved: 04/Mar/20

Status: Closed
Project: C# Driver
Component/s: None
Affects Version/s: 2.10.2
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Alex Bevilacqua Assignee: Robert Stam
Resolution: Works as Designed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Case:

 Description   

When awaiting multiple BulkWriteAsync tasks using Task.WhenAll wrapped in a transaction, if no other operations have been performed on the MongoClient the transaction will fail with: Only servers in a sharded cluster can start a new transaction at the active transaction number

For example, the following would fail:

// ...
// _db.RunCommand((Command<BsonDocument>)"{ping:1}");
using (var session = await _client.StartSessionAsync().ConfigureAwait(false))
{
    session.StartTransaction();
    var writeModel1 = new List<WriteModel<BsonDocument>>();
    writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
    writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
    var writeModel2 = new List<WriteModel<BsonDocument>>();
    writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
    writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
 
    var task1 = _collection1.BulkWriteAsync(session, writeModel1);
    var task2 = _collection2.BulkWriteAsync(session, writeModel2);
    await Task.WhenAll(task1, task2).ConfigureAwait(false);
    await session.CommitTransactionAsync().ConfigureAwait(false);
}
// ...

but un-commenting the db operation would cause the code to run successfully.

--------------------

Reproduction:

mlaunch init --replicaset --nodes 3
mongo --eval 'db.getSiblingDB("test").collection1.insert({})'
mongo --eval 'db.getSiblingDB("test").collection2.insert({})'

using Microsoft.VisualStudio.TestTools.UnitTesting;
using MongoDB.Bson;
using MongoDB.Driver;
using Shouldly;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
 
namespace MongoDBCSharpTests
{
    [TestClass]
    public class UnitTest2
    {
        private const string connString = "mongodb://localhost:27017/test?replicaSet=replset";
        private IMongoDatabase _db;
        private MongoClient _client;
        private IMongoCollection<BsonDocument> _collection1;
        private IMongoCollection<BsonDocument> _collection2;
 
        [TestInitialize]
        public void Setup()
        {
            _client = new MongoClient(connString);
            _db = _client.GetDatabase("test");
            _collection1 = _db.GetCollection<BsonDocument>("collection1");
            _collection2 = _db.GetCollection<BsonDocument>("collection2");
        }
 
        public async Task RunTest()
        {
            using (var session = await _client.StartSessionAsync().ConfigureAwait(false))
            {
                session.StartTransaction();
 
                try
                {
                    var writeModel1 = new List<WriteModel<BsonDocument>>();
                    writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
                    writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
                    var writeModel2 = new List<WriteModel<BsonDocument>>();
                    writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
                    writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
 
                    var task1 = _collection1.BulkWriteAsync(session, writeModel1);
                    var task2 = _collection2.BulkWriteAsync(session, writeModel2);
                    await Task.WhenAll(task1, task2).ConfigureAwait(false);
                    await session.CommitTransactionAsync().ConfigureAwait(false);
                    Debug.WriteLine("Committed");
                }
                catch (Exception ex)
                {
                    await session.AbortTransactionAsync().ConfigureAwait(false);
                    Debug.WriteLine("Error... Aborting");
                    Debug.WriteLine(ex.Message);
                    throw ex;
                }
 
                Debug.WriteLine("Completed");
            }
        }
 
        [TestMethod]
        public void ShouldFailWithMongoCommandException()
        {
            Should.Throw<MongoCommandException>(() => RunTest()).Message.ShouldContain("Only servers in a sharded cluster can start a new transaction at the active transaction number");
        }
 
        [TestMethod]
        public void ShouldSucceed()
        {
            _db.RunCommand((Command<BsonDocument>)"{ping:1}");
            Should.NotThrow(() => RunTest());
        }
    }
}



 Comments   
Comment by Sujesh Arukil [ 04/Mar/20 ]

I don't think it is a race condition. Any operation to the db to trigger an activity is all is needed.

like this

_db.RunCommand((Command<BsonDocument>)"{ping:1}");

and then this succeeds

openSession

  openTransaction

  await Task.WhenAll(....)
  CommitTransaction

 

This is not needed when not using a transaction.  

 

 

Comment by Robert Stam [ 04/Mar/20 ]

I wondered about that too.

It's probably a race condition. If one call to `BulkWriteAsync` happens to complete before the other happens to start the code could succeed.

Comment by Sujesh Arukil [ 04/Mar/20 ]

While this may be, it does not explain that performing any operation on the Db first and then calling a Task.WhenAll works. Any explanation on that?

Comment by Robert Stam [ 04/Mar/20 ]

Sessions are not multi-thread (or multi-Task) safe.

Only one operation at a time can be performed on each session.

Generated at Wed Feb 07 21:44:01 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.