diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 04e1b10..da798c6 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -380,6 +380,20 @@ namespace { } } + void writeOpToOplog(OperationContext* txn, const BSONObj& op) { + if ( localOplogRSCollection == 0 ) { + Client::Context ctx(txn, rsoplog); + localDB = ctx.db(); + invariant( localDB ); + localOplogRSCollection = localDB->getCollection( rsoplog ); + massert(99999, "local.oplog.rs missing. did you drop it? if so restart server", localOplogRSCollection); + } + Lock::CollectionLock lk(txn->lockState(), "local.oplog.rs", MODE_IS); // ??? + WriteUnitOfWork wunit(txn); + localOplogRSCollection->insertDocument(txn, op, false); + wunit.commit(); + } + OpTime writeOpsToOplog(OperationContext* txn, const std::deque& ops) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); OpTime lastOptime = replCoord->getMyLastOptime(); @@ -440,6 +454,28 @@ namespace { } } + OpTime getLastOpTime(OperationContext* txn, const std::deque& ops) { + + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + invariant(!ops.empty()); + + ScopedTransaction transaction(txn, MODE_IX); // XXXXXX ??? + Lock::DBLock lk(txn->lockState(), "local", MODE_X); + + BackgroundSync* bgsync = BackgroundSync::get(); + // Keep this up-to-date, in case we step up to primary. + long long hash = ops.back()["h"].numberLong(); + bgsync->setLastAppliedHash(hash); + + const OpTime lastOpTime = ops.back()["ts"]._opTime(); + Client::Context ctx(txn, rsoplog, localDB); + ctx.getClient()->setLastOp(lastOpTime); + replCoord->setMyLastOptime(lastOpTime); + setNewOptime(lastOpTime); + return lastOpTime; + } + + void createOplog(OperationContext* txn) { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 07b0723..904727d 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -54,6 +54,12 @@ namespace repl { // Returns the optime for the last op inserted. OpTime writeOpsToOplog(OperationContext* txn, const std::deque& ops); + // Updates global optime, and returns optime for the last op + OpTime getLastOpTime(OperationContext* txn, const std::deque& ops); + + // Write a single op to oplog; does not update optime + void writeOpToOplog(OperationContext* txn, const BSONObj& op); + const char rsoplog[] = "local.oplog.rs"; static const int OPLOG_VERSION = 2; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 4b8c8db..400b77c 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -199,6 +199,10 @@ namespace repl { // to suppress errors when replaying oplog entries. bool ok = !applyOperation_inlock(txn, ctx.db(), op, true, convertUpdateToUpsert); opsAppliedStats.increment(); + + // insert op into our oplog + writeOpToOplog(txn, op); + return ok; } catch (const WriteConflictException&) { @@ -249,30 +253,24 @@ namespace repl { _prefetcherPool.join(); } - // Doles out all the work to the writer pool threads and waits for them to complete - void SyncTail::applyOps(const std::vector< std::vector >& writerVectors) { + // Delegates all the work to the writer pool threads and waits for them to complete + void SyncTail::applyOps(SyncTail::OpQueue& ops) { TimerHolder timer(&applyBatchStats); - for (std::vector< std::vector >::const_iterator it = writerVectors.begin(); - it != writerVectors.end(); - ++it) { - if (!it->empty()) { - _writerPool.schedule(_applyFunc, boost::cref(*it), this); - } + for (uint32_t workerId=0; workerId& ops) { + OpTime SyncTail::multiApply(OperationContext* txn, OpQueue& ops) { if (getGlobalEnvironment()->getGlobalStorageEngine()->isMmapV1()) { // Use a ThreadPool to prefetch all the operations in a batch. - prefetchOps(ops); + prefetchOps(ops.getDeque()); } - std::vector< std::vector > writerVectors(replWriterThreadCount); - fillWriterVectors(ops, &writerVectors); - LOG(2) << "replication batch size is " << ops.size() << endl; + LOG(2) << "replication batch size is " << ops.getSize() << endl; // We must grab this because we're going to grab write locks later. // We hold this mutex the entire time we're writing; it doesn't matter // because all readers are blocked anyway. @@ -289,54 +287,119 @@ namespace repl { fassertFailed(28527); } - applyOps(writerVectors); + applyOps(ops); if (inShutdown()) { return OpTime(); } - OpTime lastOpTime = writeOpsToOplog(txn, ops); + OpTime lastOpTime = getLastOpTime(txn, ops.getDeque()); BackgroundSync::get()->notify(txn); return lastOpTime; } - void SyncTail::fillWriterVectors(const std::deque& ops, - std::vector< std::vector >* writerVectors) { + // same logic as was in SyncTail::fillWriterVectors to assign ops to workers + // XXX tbd whether it makes sense to go through this code + // path at all for non-doclocking case; if not can simplify this + // e.g. maybe just use the _id and don't bother using ns or computing a hash? + // if does make sense, can split to two separate functions taking supportsDocLocking() out + uint32_t getOpHash(const BSONObj* op) { + const BSONElement e = op->getField("ns"); + verify(e.type() == String); + const char* ns = e.valuestr(); + int len = e.valuestrsize(); + uint32_t hash = 0; + MurmurHash3_x86_32( ns, len, 0, &hash); + const char* opType = op->getField( "op" ).valuestrsafe(); + if (getGlobalEnvironment()->getGlobalStorageEngine()->supportsDocLocking() && + isCrudOpType(opType)) { + BSONElement id; + switch (opType[0]) { + case 'u': + id = op->getField("o2").Obj()["_id"]; + break; + case 'd': + case 'i': + id = op->getField("o").Obj()["_id"]; + break; + } + const size_t idHash = BSONElement::Hasher()( id ); + MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); + } + return hash; + } + + const BSONObj* SyncTail::OpQueue::next(uint32_t workerId, uint32_t (*hasher)(const BSONObj*)) { - for (std::deque::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { - const BSONElement e = it->getField("ns"); - verify(e.type() == String); - const char* ns = e.valuestr(); - int len = e.valuestrsize(); - uint32_t hash = 0; - MurmurHash3_x86_32( ns, len, 0, &hash); - - const char* opType = it->getField( "op" ).valuestrsafe(); - - if (getGlobalEnvironment()->getGlobalStorageEngine()->supportsDocLocking() && - isCrudOpType(opType)) { - BSONElement id; - switch (opType[0]) { - case 'u': - id = it->getField("o2").Obj()["_id"]; - break; - case 'd': - case 'i': - id = it->getField("o").Obj()["_id"]; - break; + // exclusive access to main queue and worker queues + SimpleMutex::scoped_lock lk(_mtx); + + // lazily initialize _next + if (!_nextValid) { + _next = _deque.begin(); + _nextValid = true; + } + + // lazily allocate _workerQueues + // could preallocate, probably doesn't matter + if (workerId >= _workerQueues.size()) + _workerQueues.resize(workerId+1); + + // any work that was assigned to us by other workers? + WorkerQueue& myQueue = _workerQueues[workerId]; + if (myQueue.queue.size() > 0) { + // yes, so take the next one + const BSONObj* op = myQueue.queue.front(); + myQueue.queue.pop_front(); + return op; + } + + // no, so find an item from the main queue + while (true) { + + if (_next==_deque.end()) { + + // nothing more to do + return NULL; + + } else { + + // next op from main queue + const BSONObj* op = &*(_next++); + + // hash based on _id to determine if another worker + // is (probably) working on this _id + uint32_t hash = hasher(op); + + // is anyone else working on this _id? + // could use a map instead of scanning, but is + // small so this is probably good enough, + // maybe even faster + for (auto it = _workerQueues.begin(); it!=_workerQueues.end(); ++it) { + if (it->hash == hash) { + // yes, so assign it to them and go on to the next one + it->queue.push_back(op); + if (it->queue.size() > _maxWorkerQueueSize) { + _maxWorkerQueueSize = it->queue.size(); + log() << "xxx _maxQueueSize " << _maxWorkerQueueSize; + } + op = NULL; + break; + } } - const size_t idHash = BSONElement::Hasher()( id ); - MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); + // handle it if we didn't assign it to someone else + if (op) { + // claim future work on this _id + myQueue.hash = hash; + return op; + } } - - (*writerVectors)[hash % writerVectors->size()].push_back(*it); } } + void SyncTail::oplogApplication(OperationContext* txn, const OpTime& endOpTime) { _applyOplogUntil(txn, endOpTime); } @@ -384,7 +447,7 @@ namespace repl { bytesApplied += ops.getSize(); entriesApplied += ops.getDeque().size(); - const OpTime lastOpTime = multiApply(txn, ops.getDeque()); + const OpTime lastOpTime = multiApply(txn, ops); if (inShutdown()) { return; @@ -507,7 +570,7 @@ namespace { // if we should crash and restart before updating the oplog OpTime minValid = lastOp["ts"]._opTime(); setMinValid(&txn, minValid); - multiApply(&txn, ops.getDeque()); + multiApply(&txn, ops); } } @@ -634,7 +697,7 @@ namespace { } // This free function is used by the writer threads to apply each op - void multiSyncApply(const std::vector& ops, SyncTail* st) { + void multiSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId) { initializeWriterThread(); OperationContextImpl txn; @@ -644,17 +707,18 @@ namespace { bool convertUpdatesToUpserts = true; - for (std::vector::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { + while (true) { + const BSONObj *op = ops.next(workerId, getOpHash); + if (!op) + break; try { - if (!st->syncApply(&txn, *it, convertUpdatesToUpserts)) { + if (!st->syncApply(&txn, *op, convertUpdatesToUpserts)) { fassertFailedNoTrace(16359); } } catch (const DBException& e) { error() << "writer worker caught exception: " << causedBy(e) - << " on: " << it->toString(); + << " on: " << op->toString(); if (inShutdown()) { return; @@ -666,7 +730,7 @@ namespace { } // This free function is used by the initial sync writer threads to apply each op - void multiInitialSyncApply(const std::vector& ops, SyncTail* st) { + void multiInitialSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId) { initializeWriterThread(); OperationContextImpl txn; @@ -674,14 +738,15 @@ namespace { // allow us to get through the magic barrier txn.lockState()->setIsBatchWriter(true); - for (std::vector::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { + while (true) { + const BSONObj *op = ops.next(workerId, getOpHash); + if (!op) + break; try { - if (!st->syncApply(&txn, *it)) { + if (!st->syncApply(&txn, *op)) { - if (st->shouldRetry(&txn, *it)) { - if (!st->syncApply(&txn, *it)) { + if (st->shouldRetry(&txn, *op)) { + if (!st->syncApply(&txn, *op)) { fassertFailedNoTrace(15915); } } @@ -693,7 +758,7 @@ namespace { } catch (const DBException& e) { error() << "writer worker caught exception: " << causedBy(e) - << " on: " << it->toString(); + << " on: " << op->toString(); if (inShutdown()) { return; diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 1802a05..5f9434e 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -46,8 +46,11 @@ namespace repl { * "Normal" replica set syncing */ class SyncTail : public Sync { - typedef void (*MultiSyncApplyFunc)(const std::vector& ops, SyncTail* st); public: + + class OpQueue; + typedef void (*MultiSyncApplyFunc)(OpQueue& ops, SyncTail* st, uint32_t workerId); + SyncTail(BackgroundSyncInterface *q, MultiSyncApplyFunc func); virtual ~SyncTail(); virtual bool syncApply(OperationContext* txn, @@ -64,7 +67,7 @@ namespace repl { class OpQueue { public: - OpQueue() : _size(0) {} + OpQueue() : _mtx("OpQueue"), _size(0), _nextValid(false), _maxWorkerQueueSize(0) {} size_t getSize() { return _size; } std::deque& getDeque() { return _deque; } void push_back(BSONObj& op) { @@ -80,9 +83,33 @@ namespace repl { return _deque.back(); } + // returns next op for the specified worker thread + // this is threadsafe (other methods are not) + const BSONObj* next(uint32_t workerId, uint32_t (*hasher)(const BSONObj*)); + private: + + // serialize access to make next() threadsafe + SimpleMutex _mtx; + + // main queue of ops std::deque _deque; size_t _size; + + // use _next instead of pop_front so we can return + // a pointer instead of copying each op + std::deque::const_iterator _next; + bool _nextValid; + + // per-worker queues contain items assigned to us by another worker + // because we (probably) had work in progress on that document + class WorkerQueue { + public: + uint32_t hash; + std::deque queue; + }; + std::vector _workerQueues; + uint32_t _maxWorkerQueueSize; }; // returns true if we should continue waiting for BSONObjs, false if we should @@ -101,7 +128,7 @@ namespace repl { // Prefetch and write a deque of operations, using the supplied function. // Initial Sync and Sync Tail each use a different function. // Returns the last OpTime applied. - OpTime multiApply(OperationContext* txn, std::deque& ops); + OpTime multiApply(OperationContext* txn, OpQueue& ops); /** * Applies oplog entries until reaching "endOpTime". @@ -122,10 +149,8 @@ namespace repl { static void prefetchOp(const BSONObj& op); // Doles out all the work to the writer pool threads and waits for them to complete - void applyOps(const std::vector< std::vector >& writerVectors); + void applyOps(OpQueue& ops); - void fillWriterVectors(const std::deque& ops, - std::vector< std::vector >* writerVectors); void handleSlaveDelay(const BSONObj& op); // persistent pool of worker threads for writing ops to the databases @@ -136,8 +161,8 @@ namespace repl { }; // These free functions are used by the thread pool workers to write ops to the db. - void multiSyncApply(const std::vector& ops, SyncTail* st); - void multiInitialSyncApply(const std::vector& ops, SyncTail* st); + void multiSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId); + void multiInitialSyncApply(SyncTail::OpQueue& ops, SyncTail* st, uint32_t workerId); } // namespace repl } // namespace mongo