-
Type:
Improvement
-
Resolution: Fixed
-
Priority:
Unknown
-
Affects Version/s: None
-
Component/s: None
-
None
-
Java Drivers
-
Not Needed
-
None
-
None
-
None
-
None
-
None
-
None
MongoDB Spark connector against a collection of around 400M documents has been observed to lead to poor performance, due to the placement of the {$project} stage..
Spark Connector example:
records_df = (
spark.read.format("mongodb")
.schema(flat_record_schema)
.option("connection.uri", args.mongodb_uri)
.option("database", args.mongodb_database)
.option("collection", args.mongodb_records_collection)
.option("batchSize", args.batch_size)
.option("maxBatchSize", args.max_batch_size)
.option(
"partitioner",
"com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner",
)
.option("partitioner.options.partition.field", "record_id")
.option("partitioner.options.max.number.of.partitions", "5000")
.option("aggregation.pipeline", '[{"$match": {"record_type": "county_crim"}}]')
.load()
)
The aggregation Pipeline generated:
[
{'$match': { record_id: { '$gte': 'county_crim:599215b947b6c000380b941b' } }},
{ '$match': { record_type: 'county_crim' } },
{ '$project': { record_id: 1, _id: 0 } },
{ '$sort': { record_id: 1 } },
{ '$skip': 41254 },
{ '$limit': 1 }
]
The $project stage is at the wrong place. It should be the last stage in this pipeline. More details here: $project Stage Placement.
On my local env I was able to simulate this scenario with:
# Aggregation pipeline pipeline = '[{"$match": {"country": "Canada"}}]' records_df = ( spark.read.format("mongodb") .schema(flat_record_schema) # optional .option("connection.uri", "mongodb://localhost:27017") .option("database", "article") .option("collection", "col1") .option("batchSize", 1000) .option("maxBatchSize", 5000) .option( "partitioner", "com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner" ) .option("partitioner.options.partition.field", "someid") .option("partitioner.options.max.number.of.partitions", "5000") .option("aggregation.pipeline", pipeline) .load() )
Which generated the following Aggregation Pipeline:
[
{ "$match": { "someid": { "$gte": 5 } } },
{ "$match": { "country": "Canada" } },
{ "$project": { "someid": 1, "_id": 0 } },
{ "$sort": { "someid": 1 } },
{ "$skip": 1 },
{ "$limit": 1 }
]
The execution Plan indicated that: 4142 index keys were examined
However, if we rewrite to:
[
{ "$match": { "someid": { "$gte": 5 } } },
{ "$match": { "country": "Canada" } },
{ "$sort": { "someid": 1 } },
{ "$skip": 1 },
{ "$limit": 1 },
{ "$project": { "someid": 1, "_id": 0 } }
]
The execution Plan indicated that: 2 index keys were examined
Even though the result set does not change having the $project upper triggers this limitation https://jira.mongodb.org/browse/SERVER-49306.