From db03f27e0d9b2fd46f1f2a9ab7b381242ffb6817 Mon Sep 17 00:00:00 2001 From: Robert Sander Date: Thu, 23 Jan 2025 15:33:09 +0000 Subject: [PATCH] repro --- repro_timeseries_bucket_commit.js | 78 +++++++++++++++++++ .../write_ops/timeseries_write_ops.cpp | 4 + 2 files changed, 82 insertions(+) create mode 100644 repro_timeseries_bucket_commit.js diff --git a/repro_timeseries_bucket_commit.js b/repro_timeseries_bucket_commit.js new file mode 100644 index 00000000000..42bc2dd9ec5 --- /dev/null +++ b/repro_timeseries_bucket_commit.js @@ -0,0 +1,78 @@ +import {assertArrayEq} from "jstests/aggregation/extras/utils.js"; +import {configureFailPoint} from "jstests/libs/fail_point_util.js"; +import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js"; +import {ShardingTest} from "jstests/libs/shardingtest.js"; + +const st = new ShardingTest({mongos: 1, shards: 3}); + +Random.setRandomSeed(); + +const dbName = "test"; +const collName = "foo"; +const metaField = "mt"; +const timeField = "time"; +const testDB = st.s.getDB(dbName); + +function generateRandomTimestamp() { + const startTime = ISODate("2012-01-01T00:01:00.000Z"); + const maxTime = ISODate("2015-12-31T23:59:59.000Z"); + return new Date(Math.floor(Random.rand() * (maxTime.getTime() - startTime.getTime()) + + startTime.getTime())); +} + +function runInsertRandomTimeseriesWithIntermittentMoveCollection(orderedInsert, failpoint) { + testDB[collName].drop(); + + assert.commandWorked(testDB.createCollection(collName, { + timeseries: { + timeField: timeField, + metaField: metaField, + bucketMaxSpanSeconds: 900, + bucketRoundingSeconds: 900 + } + })); + + jsTest.log("Move collection to non-primary shard"); + assert.commandWorked( + st.s.adminCommand({moveCollection: dbName + '.' + collName, toShard: st.shard1.shardName})); + + let docs = []; + // Insert 100 documents at random times spanning 3 years (between 2012 and 2015). These dates + // were chosen arbitrarily. + for (let i = 0; i < 100; i++) { + docs.push({[timeField]: generateRandomTimestamp(), [metaField]: "location"}); + } + + let writeFP = configureFailPoint(st.rs1.getPrimary(), failpoint); + + jsTest.log("Begin writes"); + const awaitResult = startParallelShell( + funWithArgs(function(dbName, collName, docs, orderedInsert) { + let testDB = db.getSiblingDB(dbName); + assert.commandWorked(testDB[collName].insertMany(docs, orderedInsert)); + }, dbName, collName, docs, orderedInsert), st.s.port); + + jsTest.log("Wait for failpoint"); + writeFP.wait(); + + jsTest.log("Move collection"); + assert.commandWorked( + st.s.adminCommand({moveCollection: dbName + '.' + collName, toShard: st.shard2.shardName})); + + jsTest.log("Release failpoint and wait for result"); + writeFP.off(); + awaitResult(); + + const insertedDocs = testDB[collName].find().toArray(); + assertArrayEq({actual: insertedDocs, expected: docs, fieldsToSkip: ["_id"]}); +} + +assert.commandWorked( + st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName})); + +runInsertRandomTimeseriesWithIntermittentMoveCollection( + {ordered: false}, 'hangCommitTimeseriesBucketBeforeCheckingTimeseriesCollection'); +// runInsertRandomTimeseriesWithIntermittentMoveCollection( +// {ordered: true}, 'hangCommitTimeseriesBucketsAtomicallyBeforeCheckingTimeseriesCollection'); + +st.stop(); diff --git a/src/mongo/db/timeseries/write_ops/timeseries_write_ops.cpp b/src/mongo/db/timeseries/write_ops/timeseries_write_ops.cpp index 4ea34bc5533..8e72ae5a31b 100644 --- a/src/mongo/db/timeseries/write_ops/timeseries_write_ops.cpp +++ b/src/mongo/db/timeseries/write_ops/timeseries_write_ops.cpp @@ -62,6 +62,8 @@ namespace { MONGO_FAIL_POINT_DEFINE(failAtomicTimeseriesWrites); MONGO_FAIL_POINT_DEFINE(failUnorderedTimeseriesInsert); MONGO_FAIL_POINT_DEFINE(hangInsertIntoBucketCatalogBeforeCheckingTimeseriesCollection); +MONGO_FAIL_POINT_DEFINE(hangCommitTimeseriesBucketBeforeCheckingTimeseriesCollection); +MONGO_FAIL_POINT_DEFINE(hangCommitTimeseriesBucketsAtomicallyBeforeCheckingTimeseriesCollection); MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeCommit); using TimeseriesBatches = @@ -277,6 +279,7 @@ bool commitTimeseriesBucket(OperationContext* opCtx, absl::flat_hash_map& retryAttemptsForDup, const mongo::write_ops::InsertCommandRequest& request) try { auto& bucketCatalog = bucket_catalog::GlobalBucketCatalog::get(opCtx->getServiceContext()); + hangCommitTimeseriesBucketBeforeCheckingTimeseriesCollection.pauseWhileSet(); auto metadata = getMetadata(bucketCatalog, batch->bucketId); auto catalog = CollectionCatalog::get(opCtx); @@ -553,6 +556,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx, }}; try { + hangCommitTimeseriesBucketsAtomicallyBeforeCheckingTimeseriesCollection.pauseWhileSet(); std::vector insertOps; std::vector updateOps; auto catalog = CollectionCatalog::get(opCtx); -- 2.34.1