Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-230

data lose when export mongodb documents

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 2.1.2, 2.1.5
    • Component/s: Partitioners
    • Environment:
      MongoDB server version: 3.6.6-1.4 (sharded cluster or replicaset)
      3.18.6-2.el7.centos.x86_64

      Greeting,

      I have a sharded collection on  mongo 3.6.6-1.4 with Total records: 45943325. I use mongo-spark-connector 2.1.5(also tried v2.1.2) to export all records to hdfs, find the records number in hdfs is 45906003.  I know SPARK-153 has fixed data loss  and I have confirmed it is in my build. I also tried the same collection on replicaset before I change the db/col to sharding way, got same issue.

      This collection is sharded by field "@id" which is uuid string.  I use mongo-hadoop v2.0.2 to do same export, the hdfs records numbers is correct without data loss but the mr task ran almost 20 hours because 3 mappers spent much time(I don't know why). Below is the mr spliter information for reference:
      19/01/10 21:51:01 INFO splitter.MongoCollectionSplitter: Created split: min={ "_id" : { "$minKey" : 1}}, max= { "_id" : { "$maxKey" : 1}}
      19/01/10 21:51:01 INFO splitter.MongoCollectionSplitter: Created split: min={ "@id" : { "$minKey" : 1}}, max= { "@id" : -9208592110459211922}
      19/01/10 21:51:01 INFO splitter.MongoCollectionSplitter: Created split: min={ "@id" : -9208592110459211922}, max= { "@id" : -9193674433217287092}
      ......
      19/01/10 21:51:03 INFO splitter.MongoCollectionSplitter: Created split: min={ "@id" : "https://sz.esf.leju.com/info/306186/"}, max= { "@id" : "https://sz.esf.leju.com/info/4471/"}
      19/01/10 21:51:04 INFO mapreduce.JobSubmitter: number of splits:20446
      Can you please take a look my problem? The code with Spark is very simple:

      SparkSession spark = BUILDER.appName("MongoSparkConnectorRead")
      .config("spark.mongodb.input.uri", dbColUrl)
      .config("spark.mongodb.output.uri",dbColUrl)
      .getOrCreate();

      JSC = new JavaSparkContext(spark.sparkContext());

      Map<String, String> readOverrides = new HashMap<>();
      readOverrides.put("readPreference.name", "secondaryPreferred");
      ReadConfig readConfig = ReadConfig.create(JSC).withOptions(readOverrides);

      Gson GSON = new Gson();
      JavaMongoRDD<Document> rdd = MongoSpark.load(JSC, readConfig);

      rdd.coalesce(100).map(doc->

      { doc.remove("_id"); return GSON.toJson(doc); }

      ).saveAsTextFile(dirPath);

      gradle:

      compile 'org.mongodb.spark:mongo-spark-connector_2.11:2.1.5'
      compile 'org.apache.spark:spark-core_2.11:2.1.3'
      compile 'org.apache.spark:spark-sql_2.11:2.1.3'
      compile group: 'org.mongodb.mongo-hadoop', name: 'mongo-hadoop-core', version: '2.0.2'

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            catslink Rui Yang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: