Uploaded image for project: 'C# Driver'
  1. C# Driver
  2. CSHARP-2974

Task.WhenAll fails with transaction error when awaiting multiple BulkWrite operations

    XMLWordPrintableJSON

Details

    • Icon: Bug Bug
    • Resolution: Works as Designed
    • Icon: Major - P3 Major - P3
    • None
    • 2.10.2
    • None
    • None

    Description

      When awaiting multiple BulkWriteAsync tasks using Task.WhenAll wrapped in a transaction, if no other operations have been performed on the MongoClient the transaction will fail with: Only servers in a sharded cluster can start a new transaction at the active transaction number

      For example, the following would fail:

      // ...
      // _db.RunCommand((Command<BsonDocument>)"{ping:1}");
      using (var session = await _client.StartSessionAsync().ConfigureAwait(false))
      {
          session.StartTransaction();
          var writeModel1 = new List<WriteModel<BsonDocument>>();
          writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
          writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
          var writeModel2 = new List<WriteModel<BsonDocument>>();
          writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
          writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
       
          var task1 = _collection1.BulkWriteAsync(session, writeModel1);
          var task2 = _collection2.BulkWriteAsync(session, writeModel2);
          await Task.WhenAll(task1, task2).ConfigureAwait(false);
          await session.CommitTransactionAsync().ConfigureAwait(false);
      }
      // ...
      

      but un-commenting the db operation would cause the code to run successfully.

      --------------------

      Reproduction:

      mlaunch init --replicaset --nodes 3
      mongo --eval 'db.getSiblingDB("test").collection1.insert({})'
      mongo --eval 'db.getSiblingDB("test").collection2.insert({})'
      

      using Microsoft.VisualStudio.TestTools.UnitTesting;
      using MongoDB.Bson;
      using MongoDB.Driver;
      using Shouldly;
      using System;
      using System.Collections.Generic;
      using System.Diagnostics;
      using System.Threading.Tasks;
       
      namespace MongoDBCSharpTests
      {
          [TestClass]
          public class UnitTest2
          {
              private const string connString = "mongodb://localhost:27017/test?replicaSet=replset";
              private IMongoDatabase _db;
              private MongoClient _client;
              private IMongoCollection<BsonDocument> _collection1;
              private IMongoCollection<BsonDocument> _collection2;
       
              [TestInitialize]
              public void Setup()
              {
                  _client = new MongoClient(connString);
                  _db = _client.GetDatabase("test");
                  _collection1 = _db.GetCollection<BsonDocument>("collection1");
                  _collection2 = _db.GetCollection<BsonDocument>("collection2");
              }
       
              public async Task RunTest()
              {
                  using (var session = await _client.StartSessionAsync().ConfigureAwait(false))
                  {
                      session.StartTransaction();
       
                      try
                      {
                          var writeModel1 = new List<WriteModel<BsonDocument>>();
                          writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
                          writeModel1.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
                          var writeModel2 = new List<WriteModel<BsonDocument>>();
                          writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
                          writeModel2.Add(new InsertOneModel<BsonDocument>(new BsonDocument { { "d", DateTime.Now } }));
       
                          var task1 = _collection1.BulkWriteAsync(session, writeModel1);
                          var task2 = _collection2.BulkWriteAsync(session, writeModel2);
                          await Task.WhenAll(task1, task2).ConfigureAwait(false);
                          await session.CommitTransactionAsync().ConfigureAwait(false);
                          Debug.WriteLine("Committed");
                      }
                      catch (Exception ex)
                      {
                          await session.AbortTransactionAsync().ConfigureAwait(false);
                          Debug.WriteLine("Error... Aborting");
                          Debug.WriteLine(ex.Message);
                          throw ex;
                      }
       
                      Debug.WriteLine("Completed");
                  }
              }
       
              [TestMethod]
              public void ShouldFailWithMongoCommandException()
              {
                  Should.Throw<MongoCommandException>(() => RunTest()).Message.ShouldContain("Only servers in a sharded cluster can start a new transaction at the active transaction number");
              }
       
              [TestMethod]
              public void ShouldSucceed()
              {
                  _db.RunCommand((Command<BsonDocument>)"{ping:1}");
                  Should.NotThrow(() => RunTest());
              }
          }
      }
      

      Attachments

        Activity

          People

            robert@mongodb.com Robert Stam
            alex.bevilacqua@mongodb.com Alex Bevilacqua
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: