Description
When using the MongoPaginateBySizePartitioner, some documents are duplicated in the dataset.
Versions used:
MongoDB | 3.4.10 |
Spark | 2.4.0 |
Mongo Spark connector | 2.4.1 |
Code to reproduce the problem:
First, insert 2 million documents in a collection:
case class Data(_id: String, i: Double) |
|
|
object insert { |
def main(args: Array[String]): Unit = { |
val spark: SparkSession = SparkSession |
.builder()
|
.master("local[4]") |
.getOrCreate()
|
|
import spark.implicits._ |
val twoMillion = spark.range(1L, 2000001L).map { |
i => Data(i.toString, i.toDouble) |
}
|
twoMillion
|
.write
|
.mode(SaveMode.Overwrite)
|
.format("com.mongodb.spark.sql") |
.options(
|
Map("uri" -> "mongodb://******:27017/******", |
"collection" -> "test")) |
.save()
|
}
|
}
|
This correctly inserts exactly 2 million documents:
db.getCollection('test').count({}) // 2000000 |
db.getCollection('test').find().limit(1) // {"_id" : "500001", "i" : 500001.0} |
Then, read the collection using MongoPaginateBySizePartitioner:
object read { |
case class Data(_id: String, i: Double) |
def main(args: Array[String]): Unit = { |
val spark: SparkSession = SparkSession |
.builder()
|
.master("local[4]") |
.getOrCreate()
|
|
import spark.implicits._ |
val ds = MongoSpark.load(spark, ReadConfig( |
Map(
|
"uri" -> "mongodb://recload-mongo01.hon.2i.internal:27017/test-favi", |
"partitioner" -> "MongoPaginateBySizePartitioner", |
"partitionerOptions.partitionSizeMB" -> "1", |
"collection" -> "test" |
)
|
)).as[Data]
|
ds.createOrReplaceTempView("ds") |
println("count -> " + ds.count()) |
spark.sql("select _id, count(*) from ds group by _id having count(*) >= 2").show() |
}
|
}
|
This prints:
count -> 3081344
|
+-------+--------+
|
| _id|count(1)|
|
+-------+--------+
|
| 100010| 2|
|
|1000240| 2|
|
|1000280| 2|
|
|1000665| 2|
|
|1000795| 2|
|
|1000839| 2|
|
|1000888| 2|
|
| 100140| 2|
|
|1001866| 2|
|
|1002011| 2|
|
|1002185| 2|
|
| 100227| 2|
|
|1002442| 2|
|
| 100263| 2|
|
|1002783| 2|
|
|1002883| 2|
|
|1002887| 2|
|
| 100320| 2|
|
|1003202| 2|
|
|1003366| 2|
|
+-------+--------+
|
only showing top 20 rows
|