Partitioner $project position can lead to poor performance

XMLWordPrintableJSON

    • Type: Improvement
    • Resolution: Fixed
    • Priority: Unknown
    • 10.6.0
    • Affects Version/s: None
    • Component/s: None
    • None
    • Java Drivers
    • Not Needed
    • None
    • None
    • None
    • None
    • None
    • None

      MongoDB Spark connector against a collection of around 400M documents has been observed to lead to poor performance, due to the placement of the {$project} stage..

      Spark Connector example:

      records_df = (
              spark.read.format("mongodb")
              .schema(flat_record_schema)
              .option("connection.uri", args.mongodb_uri)
              .option("database", args.mongodb_database)
              .option("collection", args.mongodb_records_collection)
              .option("batchSize", args.batch_size)
              .option("maxBatchSize", args.max_batch_size)
              .option(
                  "partitioner",
                  "com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner",
              )
              .option("partitioner.options.partition.field", "record_id")
              .option("partitioner.options.max.number.of.partitions", "5000")
              .option("aggregation.pipeline", '[{"$match": {"record_type": "county_crim"}}]')
              .load()
          ) 

      The aggregation Pipeline generated:

      [
        {'$match': { record_id: { '$gte': 'county_crim:599215b947b6c000380b941b' } }},
        { '$match': { record_type: 'county_crim' } },
        { '$project': { record_id: 1, _id: 0 } },
        { '$sort': { record_id: 1 } },
        { '$skip': 41254 },
        { '$limit': 1 }
      ] 

      The $project stage is at the wrong place. It should be the last stage in this pipeline. More details here: $project Stage Placement.

       

      On my local env I was able to simulate this scenario with:

      # Aggregation pipeline
      pipeline = '[{"$match": {"country": "Canada"}}]'
      
      records_df = (
          spark.read.format("mongodb")
          .schema(flat_record_schema)  # optional
          .option("connection.uri", "mongodb://localhost:27017")
          .option("database", "article")
          .option("collection", "col1")
          .option("batchSize", 1000)
          .option("maxBatchSize", 5000)
          .option(
              "partitioner",
              "com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner"
          )
          .option("partitioner.options.partition.field", "someid")
          .option("partitioner.options.max.number.of.partitions", "5000")
          .option("aggregation.pipeline", pipeline)
          .load()
      )
      

      Which generated the following Aggregation Pipeline:

      [
          { "$match": { "someid": { "$gte": 5 } } },
          { "$match": { "country": "Canada" } },
          { "$project": { "someid": 1, "_id": 0 } },
          { "$sort": { "someid": 1 } },
          { "$skip": 1 },
          { "$limit": 1 }
      ] 

      The execution Plan indicated that: 4142 index keys were examined

      However, if we rewrite to:

      [
          { "$match": { "someid": { "$gte": 5 } } },
          { "$match": { "country": "Canada" } },
          { "$sort": { "someid": 1 } },
          { "$skip": 1 },
          { "$limit": 1 },
          { "$project": { "someid": 1, "_id": 0 } }
      ] 
      

      The execution Plan indicated that: 2 index keys were examined

      Even though the result set does not change having the $project upper triggers this limitation https://jira.mongodb.org/browse/SERVER-49306.

            Assignee:
            Ross Lawley
            Reporter:
            Ross Lawley
            Nabil Hachicha
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: