Hi support,
When documents include less columns, spark can get correct row count, but if documents include a large amount of columns, spark get wrong row count of data. I think it is a critical issue. Please help to check.
1. Use java to generate a json document.
public class JsonB {
public static void main(String[] args) {
StringBuffer sb = new StringBuffer();
sb.append("{");
for (int i=0; i<500; i++)
sb.append("'batchId': '6910b1beaf944baaa2571f142cbff856' }");
System.out.println(sb.toString());
}
}
2. Create a text(sky.txt) to insert 20000 rows data.
use bigdata
for(var i=0; i<20000; i++){db.skyTest.insert(
)}
3. Import data
mongo --host servername -port 27017 -ubigdata -pbigdata --authenticationDatabase bigdata < sky.txt
4. Use spark to get count
val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnector")
.config("spark.mongodb.input.uri", "mongodb://bigdata:bigdata@servername:27017/bigdata")
.config("spark.mongodb.output.uri", "mongodb://bigdata:bigdata@servername:27017/bigdata")
.config("spark.mongodb.output.database", "bigdata")
.getOrCreate()
val readConfigLT = ReadConfig(Map("uri" -> "mongodb://bigdata:bigdata@servername:27017/bigdata", "collection" -> "skyTest"))
val lTNotInRTDF = sparkSession.read.format("com.mongodb.spark.sql").options(readConfigLT.asOptions).load()
val rdd = MongoSpark.load(sparkSession.sparkContext, readConfigLT)
val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("""{ $match: { "batchId" :
{ $eq : "6910b1beaf944baaa2571f142cbff856" }} }""")))
println("----------------------" + aggregatedRdd.count())
5. You will find the row count is not 20000. Sometimes is 17429. Sometimes is 16315......
- is duplicated by
-
SPARK-151 Issues with the SamplePartitioner
- Closed