Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-262

MongoPaginateBySizePartitioner generates duplicates

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 2.4.2, 2.3.4, 2.2.8, 2.1.7
    • Affects Version/s: None
    • Component/s: None
    • None

      When using the MongoPaginateBySizePartitioner, some documents are duplicated in the dataset.

      Versions used:

      MongoDB 3.4.10
      Spark 2.4.0
      Mongo Spark connector 2.4.1

      Code to reproduce the problem:

      First, insert 2 million documents in a collection:

      case class Data(_id: String, i: Double)
      
      
      object insert {
        def main(args: Array[String]): Unit = {
          val spark: SparkSession = SparkSession
            .builder()
            .master("local[4]")
            .getOrCreate()
      
          import spark.implicits._
          val twoMillion = spark.range(1L, 2000001L).map {
            i => Data(i.toString, i.toDouble)
          }
          twoMillion
            .write
            .mode(SaveMode.Overwrite)
            .format("com.mongodb.spark.sql")
            .options(
              Map("uri" -> "mongodb://******:27017/******",
                "collection" -> "test"))
            .save()
        }
      }
      

      This correctly inserts exactly 2 million documents:

      db.getCollection('test').count({}) // 2000000
      db.getCollection('test').find().limit(1) // {"_id" : "500001", "i" : 500001.0}
      

      Then, read the collection using MongoPaginateBySizePartitioner:

      object read {
        case class Data(_id: String, i: Double)
        def main(args: Array[String]): Unit = {
          val spark: SparkSession = SparkSession
            .builder()
            .master("local[4]")
            .getOrCreate()
      
          import spark.implicits._
          val ds = MongoSpark.load(spark, ReadConfig(
            Map(
              "uri" -> "mongodb://recload-mongo01.hon.2i.internal:27017/test-favi",
              "partitioner" -> "MongoPaginateBySizePartitioner",
              "partitionerOptions.partitionSizeMB" -> "1",
              "collection" -> "test"
            )
          )).as[Data]
          ds.createOrReplaceTempView("ds")
          println("count -> " + ds.count())
          spark.sql("select _id, count(*) from ds group by _id having count(*) >= 2").show()
        }
      }
      

      This prints:

      count -> 3081344
      +-------+--------+
      |    _id|count(1)|
      +-------+--------+
      | 100010|       2|
      |1000240|       2|
      |1000280|       2|
      |1000665|       2|
      |1000795|       2|
      |1000839|       2|
      |1000888|       2|
      | 100140|       2|
      |1001866|       2|
      |1002011|       2|
      |1002185|       2|
      | 100227|       2|
      |1002442|       2|
      | 100263|       2|
      |1002783|       2|
      |1002883|       2|
      |1002887|       2|
      | 100320|       2|
      |1003202|       2|
      |1003366|       2|
      +-------+--------+
      only showing top 20 rows
      

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            sacha.viscaino@gadz.org Sacha Viscaino
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: