diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp index da521a6..d9e3874 100644 --- a/src/mongo/db/catalog/cursor_manager.cpp +++ b/src/mongo/db/catalog/cursor_manager.cpp @@ -324,16 +324,17 @@ CursorManager::~CursorManager() { } void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& reason) { - stdx::lock_guard lk(_mutex); + ExecutorRegistryPartitionGuard guardedRegistry(&_planExecutorRegistry); + stdx::lock_guard lk(_cursorMapMutex); + fassert(28819, !BackgroundOperation::inProgForNs(_nss)); - for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); - ++it) { - // we kill the executor, but it deletes itself - PlanExecutor* exec = *it; - exec->kill(reason); + for (auto&& partition : _planExecutorRegistry.partitions) { + for (auto exec : partition.executors) { + exec->kill(reason); + } + partition.executors.clear(); } - _nonCachedExecutors.clear(); if (collectionGoingAway) { // we're going to wipe out the world @@ -394,12 +395,13 @@ void CursorManager::invalidateDocument(OperationContext* txn, return; } - stdx::lock_guard lk(_mutex); + ExecutorRegistryPartitionGuard guardedRegistry(&_planExecutorRegistry); + stdx::lock_guard lk(_cursorMapMutex); - for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); - ++it) { - PlanExecutor* exec = *it; - exec->invalidate(txn, dl, type); + for (auto&& partition : _planExecutorRegistry.partitions) { + for (auto exec : partition.executors) { + exec->invalidate(txn, dl, type); + } } for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { @@ -411,7 +413,7 @@ void CursorManager::invalidateDocument(OperationContext* txn, } std::size_t CursorManager::timeoutCursors(int millisSinceLastCall) { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); vector toDelete; @@ -431,19 +433,21 @@ std::size_t CursorManager::timeoutCursors(int millisSinceLastCall) { return toDelete.size(); } -void CursorManager::registerExecutor(PlanExecutor* exec) { - stdx::lock_guard lk(_mutex); - const std::pair result = _nonCachedExecutors.insert(exec); +size_t CursorManager::registerExecutor(PlanExecutor* exec) { + size_t partition = _planExecutorRegistry.nextPartition(); + ExecutorRegistryPartitionGuard guardedRegistry(&_planExecutorRegistry, partition); + const auto result = guardedRegistry->insert(exec); invariant(result.second); // make sure this was inserted + return partition; } -void CursorManager::deregisterExecutor(PlanExecutor* exec) { - stdx::lock_guard lk(_mutex); - _nonCachedExecutors.erase(exec); +void CursorManager::deregisterExecutor(PlanExecutor* exec, size_t partition) { + ExecutorRegistryPartitionGuard guardedRegistry(&_planExecutorRegistry, partition); + guardedRegistry->erase(exec); } ClientCursor* CursorManager::find(CursorId id, bool pin) { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); CursorMap::const_iterator it = _cursors.find(id); if (it == _cursors.end()) return NULL; @@ -458,7 +462,7 @@ ClientCursor* CursorManager::find(CursorId id, bool pin) { } void CursorManager::unpin(ClientCursor* cursor) { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); invariant(cursor->isPinned()); cursor->unsetPinned(); @@ -469,7 +473,7 @@ bool CursorManager::ownsCursorId(CursorId cursorId) const { } void CursorManager::getCursorIds(std::set* openCursors) const { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { ClientCursor* cc = i->second; @@ -478,7 +482,7 @@ void CursorManager::getCursorIds(std::set* openCursors) const { } size_t CursorManager::numCursors() const { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); return _cursors.size(); } @@ -494,19 +498,19 @@ CursorId CursorManager::_allocateCursorId_inlock() { CursorId CursorManager::registerCursor(ClientCursor* cc) { invariant(cc); - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); CursorId id = _allocateCursorId_inlock(); _cursors[id] = cc; return id; } void CursorManager::deregisterCursor(ClientCursor* cc) { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); _deregisterCursor_inlock(cc); } Status CursorManager::eraseCursor(OperationContext* txn, CursorId id, bool shouldAudit) { - stdx::lock_guard lk(_mutex); + stdx::lock_guard lk(_cursorMapMutex); CursorMap::iterator it = _cursors.find(id); if (it == _cursors.end()) { diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h index 54d2a02..3bf2a3b 100644 --- a/src/mongo/db/catalog/cursor_manager.h +++ b/src/mongo/db/catalog/cursor_manager.h @@ -30,12 +30,17 @@ #pragma once +#include +#include +#include +#include #include "mongo/db/clientcursor.h" #include "mongo/db/invalidation_type.h" #include "mongo/db/namespace_string.h" #include "mongo/db/record_id.h" -#include "mongo/platform/unordered_set.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/mutex.h" namespace mongo { @@ -84,13 +89,17 @@ public: * Register an executor so that it can be notified of deletion/invalidation during yields. * Must be called before an executor yields. If an executor is cached (inside a * ClientCursor) it MUST NOT be registered; the two are mutually exclusive. + * + * Returns a token which the caller must pass back to us in order to deregister the cursor. See + * deregisterExecutor(). */ - void registerExecutor(PlanExecutor* exec); + size_t registerExecutor(PlanExecutor* exec); /** - * Remove an executor from the registry. + * Remove an executor from the registry. The value of 'registrationToken' must be the value + * given to the caller when 'exec' was registered with registerExecutor(). */ - void deregisterExecutor(PlanExecutor* exec); + void deregisterExecutor(PlanExecutor* exec, size_t registrationToken); // ----------------- @@ -143,19 +152,93 @@ public: static std::size_t timeoutCursorsGlobal(OperationContext* txn, int millisSinceLastCall); private: + struct ExecutorSet { + // Synchronizes access to 'executors'. Rather than locking this mutex directly, use the + // ExecutorRegistryPartitionGuard. + stdx::mutex mutex; + + // Prefer to access via ExecutorRegistryPartitionGuard::operator->(). + std::unordered_set executors; + }; + + // A partitioned data structure with which PlanExecutors are registered in order to receive + // notifications of events such as collection drops or invalidations. If the PlanExecutor is + // owned by a ClientCursor, it is instead registered in '_cursors'. + // + // In order to avoid a performance bottleneck, the executors are divided into n partitions, and + // access to each partition is synchronized separately. Locking of partitions should be done via + // the ExecutorRegistryPartitionGuard. + struct PartitionedExecutorRegistry { + static const size_t kNumPartitions = 256; + + PartitionedExecutorRegistry() : partitions(kNumPartitions) {} + + // Returns the index of the partition in the 'partitions' vector to which a new plan + // executor should be assigned. + size_t nextPartition() { + return _counter.fetchAndAdd(1) % kNumPartitions; + } + + std::vector partitions; + + private: + AtomicUInt32 _counter; + }; + + // Used to protect access to either all partitions in the executor registry, or to protect + // access to a single partition. + // + // If also locking '_cursorMapMutex', the cursor map mutex must be acquired *after* acquiring + // this partition guard. + class ExecutorRegistryPartitionGuard { + public: + // Acquires locks for every partition. + ExecutorRegistryPartitionGuard(PartitionedExecutorRegistry* registry) + : _registry(registry) { + for (auto&& partition : registry->partitions) { + _lockGuards.emplace_back(stdx::unique_lock(partition.mutex)); + } + } + + // Acquires locks for the ith partition. + ExecutorRegistryPartitionGuard(PartitionedExecutorRegistry* registry, size_t partition) + : _registry(registry), _partition(partition) { + _lockGuards.emplace_back( + stdx::unique_lock(registry->partitions[partition].mutex)); + } + + // Returns a pointer to the set of plan executors which this guard is guarding. Only valid + // to use if there is a single locked partition. + std::unordered_set* operator->() { + invariant(_lockGuards.size() == 1); + invariant(_partition); + invariant(_partition < _registry->partitions.size()); + return &_registry->partitions[*_partition].executors; + } + + private: + PartitionedExecutorRegistry* _registry; + + boost::optional _partition; + + std::vector> _lockGuards; + }; + CursorId _allocateCursorId_inlock(); void _deregisterCursor_inlock(ClientCursor* cc); NamespaceString _nss; unsigned _collectionCacheRuntimeId; - std::unique_ptr _random; - mutable SimpleMutex _mutex; + PartitionedExecutorRegistry _planExecutorRegistry; - typedef unordered_set ExecSet; - ExecSet _nonCachedExecutors; + // Synchronizes access to '_cursors' and '_random'. If also locking the _planExecutorRegistry, + // the ExecutorRegistryPartitionGuard must be acquired *before* taking this lock. + mutable SimpleMutex _cursorMapMutex; typedef std::map CursorMap; CursorMap _cursors; + + std::unique_ptr _random; }; } diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 395e84c..2ce49e2 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -595,12 +595,11 @@ void PlanExecutor::enqueue(const BSONObj& obj) { // ScopedExecutorRegistration // -// PlanExecutor::ScopedExecutorRegistration PlanExecutor::ScopedExecutorRegistration::ScopedExecutorRegistration(PlanExecutor* exec, const Collection* collection) : _exec(exec), _collection(collection) { invariant(_collection); - _collection->getCursorManager()->registerExecutor(_exec); + _registrationToken = _collection->getCursorManager()->registerExecutor(_exec); } PlanExecutor::ScopedExecutorRegistration::~ScopedExecutorRegistration() { @@ -609,7 +608,7 @@ PlanExecutor::ScopedExecutorRegistration::~ScopedExecutorRegistration() { // no longer exists. return; } - _collection->getCursorManager()->deregisterExecutor(_exec); + _collection->getCursorManager()->deregisterExecutor(_exec, _registrationToken); } } // namespace mongo diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 09b8de7..c65c88b 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -391,8 +391,10 @@ private: ScopedExecutorRegistration(PlanExecutor* exec, const Collection* collection); ~ScopedExecutorRegistration(); + private: PlanExecutor* const _exec; const Collection* const _collection; + size_t _registrationToken; }; /** diff --git a/src/mongo/dbtests/executor_registry.cpp b/src/mongo/dbtests/executor_registry.cpp index c6fb21f..56091e4 100644 --- a/src/mongo/dbtests/executor_registry.cpp +++ b/src/mongo/dbtests/executor_registry.cpp @@ -94,21 +94,23 @@ public: return statusWithPlanExecutor.getValue().release(); } - void registerExecutor(PlanExecutor* exec) { + size_t registerExecutor(PlanExecutor* exec) { WriteUnitOfWork wuow(&_opCtx); - _ctx->db() - ->getOrCreateCollection(&_opCtx, nss.ns()) - ->getCursorManager() - ->registerExecutor(exec); + size_t registrationToken = _ctx->db() + ->getOrCreateCollection(&_opCtx, nss.ns()) + ->getCursorManager() + ->registerExecutor(exec); wuow.commit(); + + return registrationToken; } - void deregisterExecutor(PlanExecutor* exec) { + void deregisterExecutor(PlanExecutor* exec, size_t registrationToken) { WriteUnitOfWork wuow(&_opCtx); _ctx->db() ->getOrCreateCollection(&_opCtx, nss.ns()) ->getCursorManager() - ->deregisterExecutor(exec); + ->deregisterExecutor(exec, registrationToken); wuow.commit(); } @@ -147,7 +149,7 @@ public: // Register it. run->saveState(); - registerExecutor(run.get()); + size_t token = registerExecutor(run.get()); // At this point it's safe to yield. forceYield would do that. Let's now simulate some // stuff going on in the yield. @@ -158,7 +160,7 @@ public: // At this point, we're done yielding. We recover our lock. // Unregister the runner. - deregisterExecutor(run.get()); + deregisterExecutor(run.get(), token); // And clean up anything that happened before. run->restoreState(); @@ -189,13 +191,13 @@ public: // Save state and register. run->saveState(); - registerExecutor(run.get()); + size_t token = registerExecutor(run.get()); // Drop a collection that's not ours. _client.dropCollection("unittests.someboguscollection"); // Unregister and restore state. - deregisterExecutor(run.get()); + deregisterExecutor(run.get(), token); run->restoreState(); ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL)); @@ -203,13 +205,13 @@ public: // Save state and register. run->saveState(); - registerExecutor(run.get()); + token = registerExecutor(run.get()); // Drop our collection. _client.dropCollection(nss.ns()); // Unregister and restore state. - deregisterExecutor(run.get()); + deregisterExecutor(run.get(), token); run->restoreState(); // PlanExecutor was killed. @@ -234,13 +236,13 @@ public: // Save state and register. run->saveState(); - registerExecutor(run.get()); + size_t token = registerExecutor(run.get()); // Drop all indices. _client.dropIndexes(nss.ns()); // Unregister and restore state. - deregisterExecutor(run.get()); + deregisterExecutor(run.get(), token); run->restoreState(); // PlanExecutor was killed. @@ -265,13 +267,13 @@ public: // Save state and register. run->saveState(); - registerExecutor(run.get()); + size_t token = registerExecutor(run.get()); // Drop a specific index. _client.dropIndex(nss.ns(), BSON("foo" << 1)); // Unregister and restore state. - deregisterExecutor(run.get()); + deregisterExecutor(run.get(), token); run->restoreState(); // PlanExecutor was killed. @@ -294,7 +296,7 @@ public: // Save state and register. run->saveState(); - registerExecutor(run.get()); + size_t token = registerExecutor(run.get()); // Drop a DB that's not ours. We can't have a lock at all to do this as dropping a DB // requires a "global write lock." @@ -303,7 +305,7 @@ public: _ctx.reset(new OldClientWriteContext(&_opCtx, nss.ns())); // Unregister and restore state. - deregisterExecutor(run.get()); + deregisterExecutor(run.get(), token); run->restoreState(); ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL)); @@ -311,7 +313,7 @@ public: // Save state and register. run->saveState(); - registerExecutor(run.get()); + token = registerExecutor(run.get()); // Drop our DB. Once again, must give up the lock. _ctx.reset(); @@ -319,7 +321,7 @@ public: _ctx.reset(new OldClientWriteContext(&_opCtx, nss.ns())); // Unregister and restore state. - deregisterExecutor(run.get()); + deregisterExecutor(run.get(), token); run->restoreState(); _ctx.reset(); diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index b06793e..2167a55 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -171,21 +171,22 @@ public: return collection->getCursorManager()->numCursors(); } - void registerExec(PlanExecutor* exec) { + size_t registerExec(PlanExecutor* exec) { // TODO: This is not correct (create collection under S-lock) AutoGetCollectionForRead ctx(&_txn, nss.ns()); WriteUnitOfWork wunit(&_txn); Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, nss.ns()); - collection->getCursorManager()->registerExecutor(exec); + size_t registrationToken = collection->getCursorManager()->registerExecutor(exec); wunit.commit(); + return registrationToken; } - void deregisterExec(PlanExecutor* exec) { + void deregisterExec(PlanExecutor* exec, size_t registrationToken) { // TODO: This is not correct (create collection under S-lock) AutoGetCollectionForRead ctx(&_txn, nss.ns()); WriteUnitOfWork wunit(&_txn); Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, nss.ns()); - collection->getCursorManager()->deregisterExecutor(exec); + collection->getCursorManager()->deregisterExecutor(exec, registrationToken); wunit.commit(); } @@ -217,7 +218,7 @@ public: Collection* coll = ctx.getCollection(); unique_ptr exec(makeCollScanExec(coll, filterObj)); - registerExec(exec.get()); + size_t token = registerExec(exec.get()); BSONObj objOut; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); @@ -228,7 +229,7 @@ public: dropCollection(); ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); - deregisterExec(exec.get()); + deregisterExec(exec.get(), token); } }; @@ -246,7 +247,7 @@ public: addIndex(indexSpec); unique_ptr exec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); - registerExec(exec.get()); + size_t token = registerExec(exec.get()); BSONObj objOut; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); @@ -257,7 +258,7 @@ public: dropCollection(); ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); - deregisterExec(exec.get()); + deregisterExec(exec.get(), token); } }; @@ -298,7 +299,7 @@ public: unique_ptr outerExec = std::move(statusWithPlanExecutor.getValue()); // Only the outer executor gets registered. - registerExec(outerExec.get()); + size_t token = registerExec(outerExec.get()); // Verify that both the "inner" and "outer" plan executors have been killed after // dropping the collection. @@ -307,7 +308,7 @@ public: ASSERT_EQUALS(PlanExecutor::DEAD, innerExec->getNext(&objOut, NULL)); ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL)); - deregisterExec(outerExec.get()); + deregisterExec(outerExec.get(), token); } };