Add view handling driver to runAggregate()

XMLWordPrintableJSON

    • Type: Task
    • Resolution: Duplicate
    • Priority: Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • Query Integration
    • None
    • None
    • None
    • None
    • None
    • None
    • None

      Implement a single helper inside runAggregate() that loops over all LPDS stages, applies callbacks, enforces disallow rules, and prepends the view pipeline when no custom behavior is present.

      Note that this will just prepend the view pipeline to start as all LPDS's have the default policy to start.

      Changes to LiteParsedPipeline will look like the following:

      class LiteParsedPipeline {
      public:
          void handleView(const ViewInfo& viewInfo) {
              auto aggregatePolicy = DefaultViewPolicy;
      
              for (const auto& stage : _stageSpecs) {
                 auto thisPolicy = stage->getViewPolicy();
      
                 std::visit(
                    OverloadedVisitor{
                       [&](LiteParsedDocumentSource::DefaultViewPolicy) {
                          aggregatePolicy = std::max(aggregatePolicy, thisPolicy);
                       },
                       [&]LiteParsedDocumentSource::ViewPolicyFn& fn) {
                          fn(viewInfo);
                          aggregatePolicy = std::max(aggregatePolicy, thisPolicy);
                       },
                       [&]LiteParsedDocumentSource::DisallowViewType& errorMsg) {
                          uasserted(ErrorCodes::CommandNotSupportedOnView, str::stream() << "Stage " << _parseTimeName << errorMsg // " is disallowed on views");
                       },
                    thisPolicy);
              }
      
              if (aggregatePolicy == DefaultViewPolicy) {
                 prependViewPipeline(viewInfo);
              }
          }
      
          void prependViewPipeline(const ViewInfo& viewInfo) {
              LiteParsedVec newStages;
              newStages.reserve(viewInfo.viewPipe.size() + _stageSpecs.size());
      
              // Copy/move the view's effective pipeline first.
              for (auto& viewStage : viewInfo.viewPipe) {
                  newStages.push_back(std::move(viewStage));
              }
              // Append the existing user pipeline.
              for (auto& stage : _stageSpecs) {
                  newStages.push_back(std::move(stage));
              }
              _stageSpecs = std::move(newStages);
          }
      
      private:
          LiteParsedVec _stageSpecs;
      }
      

            Assignee:
            Unassigned
            Reporter:
            Lynne Wang
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: