StatusWith<std::vector<AsyncRequestsSender::Response>> establishCursorsWithShardVersioning(
|
OperationContext* opCtx,
|
const string& db,
|
const BSONObj& command,
|
int options,
|
const string& versionedNS,
|
const BSONObj& targetingQuery,
|
const BSONObj& targetingCollation,
|
std::vector<AsyncRequestsSender::Response>* results) {
|
// Build readPref by extracting readPref from command and checking options for slaveOk
|
auto readPref = ...;
|
|
// Determine from 'options' and QueryOption_PartialResults if allowPartialResults should be true
|
bool allowPartialResults = ...;
|
|
// Note, this will be the new shape of the vector of results returned by Strategy::commandOp(). Each AsyncRequestsSender::Response contains the targeted shard's shardId, exact targeted host, and error status or BSONObj command result. It is a strict superset of what was returned before.
|
StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses(
|
(std::vector<AsyncRequestsSender::Response>()));
|
int numAttempts = 0;
|
do {
|
auto routingInfoStatus =
|
Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, versionedNs);
|
if (ErrorCodes::NamespaceNotFound == routingInfoStatus) {
|
// TODO: Does aggregation expect to get an empty vector of responses in this case?
|
break;
|
}
|
auto routingInfo = uassertStatusOK(routingInfoStatus);
|
|
auto requests =
|
buildRequestsForShardsForQuery(opCtx, routingInfo, command, targetingQuery, targetingCollation);
|
|
swResponses = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), versionedNs, readPref, requests, allowPartialResults, nullptr /* TODO: can aggregation ever get CommandOnShardedViewNotSupportedOnMongod?);
|
|
if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) {
|
Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
|
}
|
|
++numAttempts;
|
} while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK());
|
|
// If we couldn't establish a shardVersion, throw. This will be caught at the top by Strategy::clientCommandOp(), the way shardVersion errors thrown by ParallelSortClusteredCursor are now.
|
uassertStatusOK(swResponses.getStatus());
|
return swResponses.getValue();
|
}
|