diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 87d5a1a..93342b5 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -317,6 +317,22 @@ namespace { ReplClientInfo::forClient(txn->getClient()).setLastOp( slot.first ); } + void writeOpToOplog(OperationContext* txn, const BSONObj& op) { + if ( _localOplogCollection == 0 ) { + OldClientContext ctx(txn, rsOplogName); + _localDB = ctx.db(); + verify( _localDB ); + _localOplogCollection = _localDB->getCollection(rsOplogName); + massert(99999, + "local.oplog.rs missing. did you drop it? if so restart server", + _localOplogCollection); + } + Lock::CollectionLock collectionLock(txn->lockState(), "local.oplog.rs", MODE_IS); // ??? + WriteUnitOfWork wunit(txn); + _localOplogCollection->insertDocument(txn, op, false); + wunit.commit(); + } + OpTime writeOpsToOplog(OperationContext* txn, const std::deque& ops) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); @@ -368,6 +384,17 @@ namespace { return lastOptime; } + OpTime getLastOpTime(OperationContext* txn, const std::deque& ops) { + + BackgroundSync* bgsync = BackgroundSync::get(); + // Keep this up-to-date, in case we step up to primary. + long long hash = ops.back()["h"].numberLong(); + bgsync->setLastAppliedHash(hash); + + const OpTime lastOptime = extractOpTime(ops.back()); + return lastOptime; + } + void createOplog(OperationContext* txn) { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index b3a1f19..9f4dc2b 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -63,6 +63,12 @@ namespace repl { OpTime writeOpsToOplog(OperationContext* txn, const std::deque& ops); + // Updates global optime, and returns optime for the last op + OpTime getLastOpTime(OperationContext* txn, const std::deque& ops); + + // Write a single op to oplog; does not update optime + void writeOpToOplog(OperationContext* txn, const BSONObj& op); + extern std::string rsOplogName; extern std::string masterSlaveOplogName; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 350a737..43f03ec 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -177,6 +177,7 @@ namespace repl { Status status = applyOperationInLock(txn, db, op, convertUpdateToUpsert); incrementOpsAppliedStats(); + writeOpToOplog(txn, op); return status; }; @@ -285,64 +286,24 @@ namespace { prefetcherPool->join(); } - // Doles out all the work to the writer pool threads and waits for them to complete - void applyOps(const std::vector< std::vector >& writerVectors, + // Delegates all the work to the writer pool threads and waits for them to complete + void applyOps(SyncTail::OpQueue& ops, threadpool::ThreadPool* writerPool, SyncTail::MultiSyncApplyFunc func, SyncTail* sync) { TimerHolder timer(&applyBatchStats); - for (std::vector< std::vector >::const_iterator it = writerVectors.begin(); - it != writerVectors.end(); - ++it) { - if (!it->empty()) { - writerPool->schedule(func, boost::cref(*it), sync); - } + for (uint32_t workerId=0; workerIdschedule(func, boost::ref(ops), sync, workerId); } writerPool->join(); } - void fillWriterVectors(const std::deque& ops, - std::vector< std::vector >* writerVectors) { - - for (std::deque::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { - const BSONElement e = it->getField("ns"); - verify(e.type() == String); - const char* ns = e.valuestr(); - int len = e.valuestrsize(); - uint32_t hash = 0; - MurmurHash3_x86_32( ns, len, 0, &hash); - - const char* opType = it->getField( "op" ).valuestrsafe(); - - if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() && - isCrudOpType(opType)) { - BSONElement id; - switch (opType[0]) { - case 'u': - id = it->getField("o2").Obj()["_id"]; - break; - case 'd': - case 'i': - id = it->getField("o").Obj()["_id"]; - break; - } - - const size_t idHash = BSONElement::Hasher()( id ); - MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); - } - - (*writerVectors)[hash % writerVectors->size()].push_back(*it); - } - } - } // namespace // Doles out all the work to the writer pool threads and waits for them to complete // static OpTime SyncTail::multiApply(OperationContext* txn, - const OpQueue& ops, + OpQueue& ops, threadpool::ThreadPool* prefetcherPool, threadpool::ThreadPool* writerPool, MultiSyncApplyFunc func, @@ -358,9 +319,6 @@ namespace { prefetchOps(ops.getDeque(), prefetcherPool); } - std::vector< std::vector > writerVectors(replWriterThreadCount); - - fillWriterVectors(ops.getDeque(), &writerVectors); LOG(2) << "replication batch size is " << ops.getDeque().size() << endl; // We must grab this because we're going to grab write locks later. // We hold this mutex the entire time we're writing; it doesn't matter @@ -378,7 +336,7 @@ namespace { fassertFailed(28527); } - applyOps(writerVectors, writerPool, func, sync); + applyOps(ops, writerPool, func, sync); if (inShutdown()) { return OpTime(); @@ -390,7 +348,7 @@ namespace { txn->recoveryUnit()->goingToWaitUntilDurable(); } - OpTime lastOpTime = writeOpsToOplog(txn, ops.getDeque()); + OpTime lastOpTime = getLastOpTime(txn, ops.getDeque()); if (mustWaitUntilDurable) { txn->recoveryUnit()->waitUntilDurable(); @@ -404,6 +362,106 @@ namespace { return lastOpTime; } + // same logic as was in SyncTail::fillWriterVectors to assign ops to workers + // XXX tbd whether it makes sense to go through this code + // path at all for non-doclocking case; if not can simplify this + // e.g. maybe just use the _id and don't bother using ns or computing a hash? + // if does make sense, can split to two separate functions taking supportsDocLocking() out + uint32_t getOpHash(const BSONObj* op) { + const BSONElement e = op->getField("ns"); + verify(e.type() == String); + const char* ns = e.valuestr(); + int len = e.valuestrsize(); + uint32_t hash = 0; + MurmurHash3_x86_32( ns, len, 0, &hash); + const char* opType = op->getField( "op" ).valuestrsafe(); + if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() && + isCrudOpType(opType)) { + BSONElement id; + switch (opType[0]) { + case 'u': + id = op->getField("o2").Obj()["_id"]; + break; + case 'd': + case 'i': + id = op->getField("o").Obj()["_id"]; + break; + } + const size_t idHash = BSONElement::Hasher()( id ); + MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); + } + return hash; + } + + const BSONObj* SyncTail::OpQueue::next(uint32_t workerId, uint32_t (*hasher)(const BSONObj*)) { + + // exclusive access to main queue and worker queues + SimpleMutex::scoped_lock lk(_mtx); + + // lazily initialize _next + if (!_nextValid) { + _next = _deque.begin(); + _nextValid = true; + } + + // lazily allocate _workerQueues + // could preallocate, probably doesn't matter + if (workerId >= _workerQueues.size()) + _workerQueues.resize(workerId+1); + + // any work that was assigned to us by other workers? + WorkerQueue& myQueue = _workerQueues[workerId]; + if (myQueue.queue.size() > 0) { + // yes, so take the next one + const BSONObj* op = myQueue.queue.front(); + myQueue.queue.pop_front(); + return op; + } + + // no, so find an item from the main queue + while (true) { + + if (_next==_deque.end()) { + + // nothing more to do + return NULL; + + } else { + + // next op from main queue + const BSONObj* op = &*(_next++); + + // hash based on _id to determine if another worker + // is (probably) working on this _id + uint32_t hash = hasher(op); + + // is anyone else working on this _id? + // could use a map instead of scanning, but is + // small so this is probably good enough, + // maybe even faster + for (auto it = _workerQueues.begin(); it!=_workerQueues.end(); ++it) { + if (it->hash == hash) { + // yes, so assign it to them and go on to the next one + it->queue.push_back(op); + if (it->queue.size() > _maxWorkerQueueSize) { + _maxWorkerQueueSize = it->queue.size(); + log() << "xxx _maxQueueSize " << _maxWorkerQueueSize; + } + op = NULL; + break; + } + } + + // no, we can handle it + if (op) { + // claim future work on this _id + myQueue.hash = hash; + return op; + } + } + } + } + void SyncTail::oplogApplication(OperationContext* txn, const OpTime& endOpTime) { _applyOplogUntil(txn, endOpTime); } @@ -817,7 +875,7 @@ namespace { } // This free function is used by the writer threads to apply each op - void multiSyncApply(const std::vector& ops, SyncTail* st) { + void multiSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId) { initializeWriterThread(); OperationContextImpl txn; @@ -829,17 +887,18 @@ namespace { bool convertUpdatesToUpserts = true; - for (std::vector::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { + while(true) { + const BSONObj *op = ops.next(workerId, getOpHash); + if (!op) + break; try { - if (!SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts).isOK()) { + if (!SyncTail::syncApply(&txn, *op, convertUpdatesToUpserts).isOK()) { fassertFailedNoTrace(16359); } } catch (const DBException& e) { error() << "writer worker caught exception: " << causedBy(e) - << " on: " << it->toString(); + << " on: " << op->toString(); if (inShutdown()) { return; @@ -851,7 +910,7 @@ namespace { } // This free function is used by the initial sync writer threads to apply each op - void multiInitialSyncApply(const std::vector& ops, SyncTail* st) { + void multiInitialSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId) { initializeWriterThread(); OperationContextImpl txn; @@ -863,14 +922,15 @@ namespace { bool convertUpdatesToUpserts = false; - for (std::vector::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { + while (true) { + const BSONObj *op = ops.next(workerId, getOpHash); + if (!op) + break; try { - if (!SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts).isOK()) { + if (!SyncTail::syncApply(&txn, *op, convertUpdatesToUpserts).isOK()) { - if (st->shouldRetry(&txn, *it)) { - if (!SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts).isOK()) { + if (st->shouldRetry(&txn, *op)) { + if (!SyncTail::syncApply(&txn, *op, convertUpdatesToUpserts).isOK()) { fassertFailedNoTrace(15915); } } @@ -882,7 +942,7 @@ namespace { } catch (const DBException& e) { error() << "writer worker caught exception: " << causedBy(e) - << " on: " << it->toString(); + << " on: " << op->toString(); if (inShutdown()) { return; diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 6b549c4..28f923a 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -35,6 +35,7 @@ #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/stdx/functional.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/concurrency/mutex.h" namespace mongo { @@ -51,8 +52,9 @@ namespace repl { */ class SyncTail { public: + class OpQueue; using MultiSyncApplyFunc = - stdx::function& ops, SyncTail* st)>; + stdx::function; /** * Type of function that takes a non-command op and applies it locally. @@ -105,25 +107,49 @@ namespace repl { class OpQueue { public: - OpQueue() : _size(0) {} - size_t getSize() const { return _size; } - const std::deque& getDeque() const { return _deque; } + OpQueue() : _mtx("OpQueue"), _size(0), _nextValid(false), _maxWorkerQueueSize(0) {} + size_t getSize() { return _size; } + std::deque& getDeque() { return _deque; } void push_back(BSONObj& op) { _deque.push_back(op); _size += op.objsize(); } - bool empty() const { + bool empty() { return _deque.empty(); } - BSONObj back() const { - invariant(!_deque.empty()); + BSONObj back() { + verify(!_deque.empty()); return _deque.back(); } + // returns next op for the specified worker thread + // this is threadsafe (other methods are not) + const BSONObj* next(uint32_t workerId, uint32_t (*hasher)(const BSONObj*)); + private: + + // serialize access to make next() threadsafe + SimpleMutex _mtx; + + // main queue of ops std::deque _deque; size_t _size; + + // use _next instead of pop_front so we can return + // a pointer instead of copying each op + std::deque::const_iterator _next; + bool _nextValid; + + // per-worker queues contain items assigned to us by another worker + // because we (probably) had work in progress on that document + class WorkerQueue { + public: + uint32_t hash; + std::deque queue; + }; + std::vector _workerQueues; + uint32_t _maxWorkerQueueSize; }; // returns true if we should continue waiting for BSONObjs, false if we should @@ -158,7 +184,7 @@ namespace repl { // Initial Sync and Sync Tail each use a different function. // Returns the last OpTime applied. static OpTime multiApply(OperationContext* txn, - const OpQueue& ops, + OpQueue& ops, threadpool::ThreadPool* prefetcherPool, threadpool::ThreadPool* writerPool, MultiSyncApplyFunc func, @@ -190,8 +216,8 @@ namespace repl { }; // These free functions are used by the thread pool workers to write ops to the db. - void multiSyncApply(const std::vector& ops, SyncTail* st); - void multiInitialSyncApply(const std::vector& ops, SyncTail* st); + void multiSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId); + void multiInitialSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId); } // namespace repl } // namespace mongo