Implement explicit desugaring step for Pipelines with Desugar Stages

XMLWordPrintableJSON

    • Type: Task
    • Resolution: Unresolved
    • Priority: Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • Query Integration
    • None
    • None
    • None
    • None
    • None
    • None
    • None

      As outlined in the technical design, we will need to introduce an explicit desugaring step that will be called from the aggregation execution path immediately after query stats registration.

      SERVER-111644 implements both DocumentSourceExtensionExpandable & DocumentSourceExtensionOptimizable.

      As part of this ticket, we will add a generic expansion step in the query processing path, immediately after query stats entry registration.
      The logic for this should look something like :

      list<intrusive_ptr<DocumentSource>> expand(list<intrusive_ptr<DocumentSource>>& inputPipeline) {
        
        list<intrusive_ptr<DocumentSource>> expansion;
        for (auto entry : inputPipeline) {
           auto expansionList = entry->expand(); // returns a list.
           if (expansionList.empty()) {
               // empty list, add the stage itself.
               expansion.push_back(entry);
           } else {
               expansion.splice(expansion.end(), expansionList.begin(), expansionList,end()); 
           }
        }
        return expansion;
      } 

      We could also do the expansion in place (i.e splice in place), implement a class Expander/Desugarer (or some other appropriate name):

       

      // Desugarer accepts a Pipeline* in its constructor, which is the Pipeline that needs to be desugared. 
      class Desugarer {
      public:
      
      Desugarer(Pipeline* pipeline) : m_pipeline (pipeline), m_sources(pipeline->getSources) {}
      
      void operator() {
         try {
            auto itr = m_sources.begin();
            while (itr != m_sources.end()) {
                invariant(itr.get());
                itr = _desugar(itr, *itr.get());
            }
         } catch (...) {
           
         }
      }
      
      DocumentSourceContainer::iterator _desugar(DocumentSourceContainer::iterator, DocumentSource& stage) {
         invariant(itr->get() == &stage);
         // Non-Desugar stages just advance to the next stage in the pipeline.
         return std::next(itr);
      }
      
      
      DocumentSourceContainer::iterator _desugar(DocumentSourceContainer::iterator, DocumentSourceExtensionExpandable& stage) {
          invariant(itr->get() == &stage);
         
          auto desugaredPipeline = stage.expand();
          // replace desugar stage with its desugared pipeline
          if (!desugaredPipeline.empty()) {
             auto desugarIterator = itr;
             m_sources.insert(itr, desugaredPipeline.begin(), desugaredPipeline.end());
             itr = std::next(itr);
             m_sources.erase(desugarIterator);
          }
      
          return itr; 
      }
      
      private:
      Pipeline* m_pipeline;
      DocumentSourceContainer& m_sources;
      }
      

       

            Assignee:
            Josh Siegel
            Reporter:
            Santiago Roche
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated: