diff --git a/repro.js b/repro.js new file mode 100644 index 00000000000..32796d991cf --- /dev/null +++ b/repro.js @@ -0,0 +1,78 @@ +import {configureFailPoint} from "jstests/libs/fail_point_util.js"; + +const dbName = "test"; +const collName = "coll"; +const collNs = dbName + "." + collName; + +const numMongos = 9; +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 3}, + rsOptions: { + setParameter: { + logComponentVerbosity: tojson({ + replication: {rollback: 2}, + sharding: {migration: 2, rangeDeleter: 2}, + transaction: 4, + tenantMigration: 4, + command: 2 + }) + } + }, + embeddedRouter: true, + mongos: numMongos +}); + +assert.commandWorked( + st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName})); + +const shard0routerPort = st.rs0.getPrimary().routerPort; + +const shard0router = (() => { + for (let i = 0; i < numMongos; ++i) { + const router = st["s" + i]; + if (router.port == shard0routerPort) { + return router; + } + } +})(); + +assert.commandWorked(shard0router.adminCommand({shardCollection: collNs, key: {x: 1}})); + +const coll = shard0router.getCollection(collNs); + +assert.commandWorked(coll.insertOne({x: 23, y: 1})); +assert.commandWorked(coll.insertOne({x: -23, y: -1})); +assert.commandWorked(shard0router.adminCommand( + {moveRange: collNs, toShard: st.shard0.shardName, min: {x: MinKey()}, max: {x: 0}})); +assert.commandWorked(shard0router.adminCommand( + {moveRange: collNs, toShard: st.shard1.shardName, min: {x: 0}, max: {x: MaxKey()}})); + +const fpPrimary = configureFailPoint( + st.rs0.getPrimary(), "transactionRouterReturnErrorOnFirstTry", {}, {times: 1}); + +const session = shard0router.getDB(dbName).getMongo().startSession({causalConsistency: true}); +const sessionColl = session.getDatabase(dbName).getCollection(collName); + +session.startTransaction_forTesting(); + +const doc = sessionColl.findOne({x: 23}); +assert.eq(doc.y, 1); + +const updateRes = sessionColl.runCommand('update', { + updates: [{q: {x: -23}, u: {$inc: {y: 1}}}], +}); + +assert.commandWorked(updateRes); +assert.eq(updateRes.n, 1, () => tojson(updateRes)); +assert.eq(updateRes.nModified, 1, () => tojson(updateRes)); + +const doc2 = sessionColl.findOne({x: -23}); +assert.eq(doc2.y, 0); + +assert.commandFailedWithCode(session.commitTransaction_forTesting(), ErrorCodes.WriteConflict); +assert.commandFailedWithCode(session.commitTransaction_forTesting(), ErrorCodes.NoSuchTransaction); + +fpPrimary.off(); + +st.stop(); diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index f516238b5cb..9b4ba0924dd 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -48,7 +48,9 @@ #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/concurrency/exception_util.h" #include "mongo/db/database_name.h" +#include "mongo/db/error_labels.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_level.h" #include "mongo/db/server_feature_flags_gen.h" @@ -80,6 +82,7 @@ #include "mongo/s/shard_cannot_refresh_due_to_locks_held_exception.h" #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" +#include "mongo/s/transaction_router_resource_yielder.h" #include "mongo/util/assert_util.h" #include "mongo/util/clock_source.h" #include "mongo/util/decorable.h" @@ -92,6 +95,9 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTransaction namespace mongo { + +MONGO_FAIL_POINT_DEFINE(transactionRouterReturnErrorOnFirstTry) + namespace { using namespace fmt::literals; @@ -1607,6 +1613,17 @@ BSONObj TransactionRouter::Router::_commitTransaction( // can't be durable as we haven't started commit on the write shard. uassertStatusOK(readOnlyWCE); } + + if (MONGO_unlikely(transactionRouterReturnErrorOnFirstTry.shouldFail())) { + LOGV2(1615237615, "Injecting WriteConflict"); + ErrorReply reply{0, + static_cast(ErrorCodes::WriteConflict), + "WriteConflict", + "WriteConflict error injected by fail point"}; + reply.setErrorLabels({{ErrorLabel::kTransientTransaction}}); + return reply.toBSON().getOwned(); + } + return sendCommitDirectlyToShards(opCtx, writeShards); } @@ -1905,6 +1922,12 @@ BSONObj TransactionRouter::Router::_commitWithRecoveryToken(OperationContext* op return attachTxnFieldsIfNeeded(opCtx, recoveryShardId, rawCoordinateCommit); }(); + // auto txnRouterResourceYielder = std::make_unique(); + // txnRouterResourceYielder->yield(opCtx); + // ScopeGuard unyield{[&] { + // txnRouterResourceYielder->unyield(opCtx); + // }}; + auto recoveryShard = uassertStatusOK(shardRegistry->getShard(opCtx, recoveryShardId)); return uassertStatusOK(recoveryShard->runCommandWithFixedRetryAttempts( opCtx,