commit ad94bf8eb3c3f4e1a5b2ab022e6b3a48bc0fc2d2
|
Author: Kyle Suarez <kyle.suarez@mongodb.com>
|
Date: Tue Aug 14 12:35:29 2018 -0400
|
|
SERVER-36187 use IDL to serialize $out
|
|
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
|
index 8d8d3db49e..4d3f2a9bd3 100644
|
--- a/src/mongo/db/pipeline/document_source_out.cpp
|
+++ b/src/mongo/db/pipeline/document_source_out.cpp
|
@@ -146,14 +146,14 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
|
MONGO_UNREACHABLE;
|
}
|
|
-DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
|
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
- WriteModeEnum mode,
|
+DocumentSourceOut::DocumentSourceOut(const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
+ DocumentSourceOutSpec spec,
|
+ NamespaceString outputNs,
|
std::set<FieldPath> uniqueKey)
|
: DocumentSource(expCtx),
|
_done(false),
|
- _outputNs(outputNs),
|
- _mode(mode),
|
+ _spec(std::move(spec)),
|
+ _outputNs(std::move(outputNs)),
|
_uniqueKeyFields(std::move(uniqueKey)) {}
|
|
intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
|
@@ -168,17 +168,17 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
|
"$out cannot be used with a 'majority' read concern level",
|
readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern);
|
|
- auto mode = WriteModeEnum::kModeReplaceCollection;
|
- std::set<FieldPath> uniqueKey;
|
+ DocumentSourceOutSpec spec;
|
NamespaceString outputNs;
|
+ std::set<FieldPath> uniqueKey;
|
+
|
if (elem.type() == BSONType::String) {
|
+ spec.setTargetCollection(elem.str());
|
+ spec.setMode(WriteModeEnum::kModeReplaceCollection);
|
outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str());
|
uniqueKey.emplace("_id");
|
} else if (elem.type() == BSONType::Object) {
|
- auto spec =
|
- DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject());
|
-
|
- mode = spec.getMode();
|
+ spec = DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject());
|
|
// Convert unique key object to a vector of FieldPaths.
|
if (auto uniqueKeyObj = spec.getUniqueKey()) {
|
@@ -186,7 +186,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
|
} else {
|
// TODO SERVER-35954: If not present, build the unique key from the shard key of the
|
// output collection.
|
- uniqueKey.emplace("_id");
|
+ uniqueKey.emplace("_id"_sd);
|
}
|
|
// Retrieve the target database from the user command, otherwise use the namespace from the
|
@@ -206,36 +206,50 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
|
// during lite parsing, we need to do it here as well in case mongos is stale or the command is
|
// sent directly to the shard.
|
uassert(17017,
|
- str::stream() << "$out with mode " << WriteMode_serializer(mode)
|
+ str::stream() << "$out with mode " << WriteMode_serializer(spec.getMode())
|
<< " is not supported to an existing *sharded* output collection.",
|
- !(mode == WriteModeEnum::kModeReplaceCollection &&
|
+ !(spec.getMode() == WriteModeEnum::kModeReplaceCollection &&
|
expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs)));
|
|
uassert(17385, "Can't $out to special collection: " + outputNs.coll(), !outputNs.isSpecial());
|
|
- switch (mode) {
|
+ switch (spec.getMode()) {
|
case WriteModeEnum::kModeReplaceCollection:
|
- return new DocumentSourceOutReplaceColl(outputNs, expCtx, mode, uniqueKey);
|
+ return new DocumentSourceOutReplaceColl(
|
+ expCtx, std::move(spec), std::move(outputNs), std::move(uniqueKey));
|
case WriteModeEnum::kModeInsertDocuments:
|
- return new DocumentSourceOutInPlace(outputNs, expCtx, mode, uniqueKey);
|
+ return new DocumentSourceOutInPlace(
|
+ expCtx, std::move(spec), std::move(outputNs), std::move(uniqueKey));
|
case WriteModeEnum::kModeReplaceDocuments:
|
- return new DocumentSourceOutInPlaceReplace(outputNs, expCtx, mode, uniqueKey);
|
+ return new DocumentSourceOutInPlaceReplace(
|
+ expCtx, std::move(spec), std::move(outputNs), std::move(uniqueKey));
|
default:
|
MONGO_UNREACHABLE;
|
}
|
}
|
|
Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
|
- MutableDocument serialized(
|
- Document{{DocumentSourceOutSpec::kTargetCollectionFieldName, _outputNs.coll()},
|
- {DocumentSourceOutSpec::kTargetDbFieldName, _outputNs.db()},
|
- {DocumentSourceOutSpec::kModeFieldName, WriteMode_serializer(_mode)}});
|
- BSONObjBuilder uniqueKeyBob;
|
- for (auto path : _uniqueKeyFields) {
|
- uniqueKeyBob.append(path.fullPath(), 1);
|
+ // Serialize as much as possible using the IDL's built-in serializer.
|
+ BSONObjBuilder serialized;
|
+ BSONObjBuilder outStage(serialized.subobjStart(getSourceName()));
|
+ _spec.serialize(&outStage);
|
+
|
+ // If the "db" or "uniqueKey" options were omitted by the user in the original command, we
|
+ // fill them out explicitly ourselves.
|
+ if (!_spec.getTargetDb()) {
|
+ outStage.append(DocumentSourceOutSpec::kTargetDbFieldName, _outputNs.db());
|
}
|
- serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(uniqueKeyBob.done());
|
- return Value(Document{{getSourceName(), serialized.freeze()}});
|
+
|
+ if (!_spec.getUniqueKey()) {
|
+ BSONObjBuilder uniqueKey(outStage.subobjStart(DocumentSourceOutSpec::kUniqueKeyFieldName));
|
+ for (const auto& path : _uniqueKeyFields) {
|
+ uniqueKey.append(path.fullPath(), 1);
|
+ }
|
+ uniqueKey.doneFast();
|
+ }
|
+
|
+ outStage.doneFast();
|
+ return Value(serialized.obj());
|
}
|
|
DepsTracker::State DocumentSourceOut::getDependencies(DepsTracker* deps) const {
|
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
|
index 5168346957..eb6109ecb8 100644
|
--- a/src/mongo/db/pipeline/document_source_out.h
|
+++ b/src/mongo/db/pipeline/document_source_out.h
|
@@ -41,9 +41,9 @@ public:
|
static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse(
|
const AggregationRequest& request, const BSONElement& spec);
|
|
- DocumentSourceOut(const NamespaceString& outputNs,
|
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
- WriteModeEnum mode,
|
+ DocumentSourceOut(const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
+ DocumentSourceOutSpec spec,
|
+ NamespaceString outputNs,
|
std::set<FieldPath> uniqueKey);
|
|
virtual ~DocumentSourceOut() = default;
|
@@ -70,6 +70,9 @@ public:
|
return {this};
|
}
|
|
+ /**
|
+ * The namespace specified by the user to which the aggregation's result will be sent.
|
+ */
|
const NamespaceString& getOutputNs() const {
|
return _outputNs;
|
}
|
@@ -89,7 +92,6 @@ public:
|
* Storage for a batch of BSON Objects to be inserted/updated to the write namespace. The
|
* extracted unique key values are also stored in a batch, used by $out with mode
|
* "replaceDocuments" as the query portion of the update.
|
- *
|
*/
|
struct BatchedObjects {
|
void emplace(BSONObj obj, BSONObj key) {
|
@@ -135,8 +137,11 @@ private:
|
bool _initialized = false;
|
bool _done = false;
|
|
+ // The original parsed $out specification from the user's command.
|
+ const DocumentSourceOutSpec _spec;
|
+
|
+ // The final namespace to which documents will be sent.
|
const NamespaceString _outputNs;
|
- WriteModeEnum _mode;
|
|
// Holds the unique key used for uniquely identifying documents. There must exist a unique index
|
// with this key pattern (up to order). Default is "_id" for unsharded collections, and "_id"
|