Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-154

When columns is more, data count is not right on aggregation function.

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 2.1.1, 2.2.1
    • Affects Version/s: 2.2.0
    • Component/s: API
    • Labels:
    • Environment:
      Spark 2.2.0
      MongoDB 3.4.7

      Hi support,

      When documents include less columns, spark can get correct row count, but if documents include a large amount of columns, spark get wrong row count of data. I think it is a critical issue. Please help to check.

      1. Use java to generate a json document.
      public class JsonB {

      public static void main(String[] args) {
      StringBuffer sb = new StringBuffer();
      sb.append("{");
      for (int i=0; i<500; i++)

      { String v = "'aa"+ i + "': " + "'abc',"; sb = sb.append(v); }

      sb.append("'batchId': '6910b1beaf944baaa2571f142cbff856' }");
      System.out.println(sb.toString());
      }

      }

      2. Create a text(sky.txt) to insert 20000 rows data.

      use bigdata
      for(var i=0; i<20000; i++){db.skyTest.insert(

      {above document}

      )}

      3. Import data
      mongo --host servername -port 27017 -ubigdata -pbigdata --authenticationDatabase bigdata < sky.txt

      4. Use spark to get count

      val sparkSession = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnector")
      .config("spark.mongodb.input.uri", "mongodb://bigdata:bigdata@servername:27017/bigdata")
      .config("spark.mongodb.output.uri", "mongodb://bigdata:bigdata@servername:27017/bigdata")
      .config("spark.mongodb.output.database", "bigdata")
      .getOrCreate()

      val readConfigLT = ReadConfig(Map("uri" -> "mongodb://bigdata:bigdata@servername:27017/bigdata", "collection" -> "skyTest"))

      val lTNotInRTDF = sparkSession.read.format("com.mongodb.spark.sql").options(readConfigLT.asOptions).load()
      val rdd = MongoSpark.load(sparkSession.sparkContext, readConfigLT)

      val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("""{ $match: { "batchId" :

      { $eq : "6910b1beaf944baaa2571f142cbff856" }

      } }""")))

      println("----------------------" + aggregatedRdd.count())

      5. You will find the row count is not 20000. Sometimes is 17429. Sometimes is 16315......

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            windshjw windshjw
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: