Summary:
A new metadata hook, CommittedOpTimeMetadataHook, will be made that checks for a "lastCommittedOpTime" field in each response to commands sent from mongos. This hook should be fired on all outgoing connections from mongos, including sharded and unsharded. When reading reply metadata, the hook will check if a "lastCommittedOpTime" field exists and verify it is a timestamp, then get the ShardRegistry from the grid, try to parse the reply source string into a HostAndPort, look up the Shard object that contains it in the registry, and call updateLastCommittedOpTime on the Shard object with the received timestamp. Note: the metadata hooks are given the replySource as a StringData, which is always a HostAndPort except from DBClientReplicaSet, which gives a full repl set ConnectionString.
The hook will be added to the EgressMetadataHookLists mongos gives to its global connection pool, its shard connection pool, and the Grid's task executor pool in src/mongo/s/server.cpp.
Note: the ReplicaSetMonitor uses a ScopedDBConnection to send isMaster commands, which uses the globalConnectionPool. Currently, mongos adds hooks to the global connection pool using a ShardingConnectionHook, which doesn't attach reply reading hooks if the hook specifies _shardedConnections = false. Since the global connection pool is used for unsharded connections, reply reading metadata hooks are not attached, meaning currently the isMaster commands sent from the replica set monitor do not fire reply reading hooks (so they also don't read clusterTime metadata). Since the RSM receiving lastCommittedOpTimes is important to our design, we will likely need to refactor ShardingConnectionHook to allow attaching reply reading hooks for unsharded connections as well. This is tracked by SERVER-33053.
Design:
## CommittedOpTimeMetadataHook class CommittedOpTimeMetadataHook : public EgressMetadataHook { public: explicit CommittedOpTimeMetadataHook(ServiceContext* service); Status writeRequestMetadata(OperationContext* opCtx, BSONObjBuilder* metadataBob) { return Status::OK(); } Status readReplyMetadata(OperationContext* opCtx, StringData replySource, const BSONObj& metadataObj) { auto lastCommittedOpTimeField = metadataObj[kLastCommittedOpTimeFieldName]; if (!lastCommittedOpTimeField.eoo()) { invariant(lastCommittedOpTimeField.type() == BSONType::bsonTimestamp); auto shardRegistry = Grid::get(_service)->shardRegistry(); if (shardRegistry) { auto shard = [&] { try { auto sourceHostAndPort = HostAndPort(replySource); return Grid::get(_service)->shardRegistry()->getShardForHostNoReload( sourceHostAndPort); } catch (const ExceptionFor<ErrorCodes::FailedToParse>& ex) { // DBClientReplicaSet sends the replySource as a connection string, so we may // fail to parse it as a HostAndPort. auto connString = uassertStatusOK(ConnectionString::parse(replySource.toString())); invariant(connString.type() == ConnectionString::SET); return Grid::get(_service)->shardRegistry()->getShardNoReload( connString.getSetName()); } }(); if (shard) { shard->updateLastCommittedOpTime(LogicalTime(lastCommittedOpTimeField.timestamp())); } } } return Status::OK(); } private: ServiceContext* _service; };
## src/mongo/s/server.cpp // Add CommittedOpTimeHook to the global connection pool and shard connection pool. static ExitCode runMongosServer() { ... auto unshardedHookList = stdx::make_unique<rpc::EgressMetadataHookList>(); unshardedHookList->addHook( stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(getGlobalServiceContext())); globalConnPool.addHook(new ShardingConnectionHook(false, std::move(unshardedHookList))); auto shardedHookList = stdx::make_unique<rpc::EgressMetadataHookList>(); shardedHookList->addHook( stdx::make_unique<rpc::LogicalTimeMetadataHook>(getGlobalServiceContext())); shardConnectionPool.addHook(new ShardingConnectionHook(true, std::move(shardedHookList))); ... } // Add CommittedOpTimeHook to the hook list used to build the Grid's task executor pool. static Status initializeSharding(OperationContext* opCtx) { ... Status status = initializeShardingState( ..., [opCtx]() { auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); hookList->addHook( stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(opCtx->getServiceContext())); return hookList; }, ...); ... }
Open Questions:
- DBClientReplicaSet triggers metadata hooks with a replySource as a full repl set connection string instead of a HostAndPort. Should we catch failures to parse the replySource as a HostAndPort and try to parse as a ConnectionString instead, or try to refactor DBClientReplicaSet somehow?
- How to refactor ShardingConnectionHook to allow reply metadata hooks to run on unsharded connections (used by RSM).
- is related to
-
SERVER-33053 Allow unsharded connections to trigger EgressMetadataHook::readReplyMetadata through ShardingConnectionHook
- Backlog
- related to
-
SERVER-33765 Log (and parse) the shard identifier given to ShardRegistry::getShard(NoReload)
- Closed
-
SERVER-67901 Stop gossiping lastCommittedOpTime in reply metadata
- Closed