Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-153

data loss during loading data by mongoPartitioner except MongoPaginateByCountPartitioner

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 2.1.1, 2.2.1
    • Affects Version/s: 2.2.0
    • Component/s: API
    • None

      when I use mongo-spark connector to load data from mongodb, if the items number level is ten thousand, and lose data.

      String startTime = yearString + monthString + "00000000";
      String endime = yearString + monthString + "99000000";
      Document filterCondition = new Document().append(filterDateField,new Document("$gt",startTime).append("$lt",endime));
      ReadConfig readConfig = ReadConfig.create(session);
      long srcCount = MongoConnector.apply(readConfig.asOptions()).withCollectionDo(readConfig, Document.class,
      new Function<MongoCollection<Document>, Long>() {
      /** */
      private static final long serialVersionUID = 1L;
      @Override
      public Long call(MongoCollection<Document> v1) throws Exception

      Unknown macro: { return v1.count(filterCondition); }

      });
      System.out.println("count in db : "+srcCount);
      // readConfig = readConfig.withOption("partitioner", MySamplePartitioner.class.getName());
      JavaMongoRDD<Document> mongoRdd = MongoSpark.load(jsc, readConfig);
      Document filterDoc = new Document("$match",filterCondition);
      JavaMongoRDD<Document> filterRdd = mongoRdd.withPipeline(Collections.singletonList(filterDoc));
      org.apache.spark.util.LongAccumulator counter = session.sparkContext().longAccumulator();
      filterRdd.foreach(new VoidFunction<Document>() {
      @Override
      public void call(Document document) throws Exception

      Unknown macro: { counter.add(1); }

      });
      System.out.println("count by partitioner: "+counter.value());

      console out:
      count in db : 231560
      count by partitioner: 199158
      Then I saw the code in default partitioner :MongoSamplePartitioner;
      Fetch sample keys by aggregate command;
      But the min key and the max key were not included in samples;
      So the data between min Key and min sample key were missing, the same as data between max sample key between max key;
      Then I add the boundary key, and the data were load fully.

      val samples = connector.withCollectionDo(readConfig,

      Unknown macro: { coll}

      )
      val minKey = connector.withCollectionDo(readConfig,

      Unknown macro: { coll}

      )
      val maxKey = connector.withCollectionDo(readConfig,

      Unknown macro: { coll}

      )
      def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count - 1
      val rightHandBoundaries = samples.zipWithIndex.collect

      Unknown macro: { case (field, i) if collectSplit(i)=> field.get(partitionKey) }

      val addMinMax = matchQuery.isEmpty
      rightHandBoundaries.insert(0,minKey.head.get(partitionKey))
      rightHandBoundaries.insert(rightHandBoundaries.size,maxKey.head.get(partitionKey))
      val partitions = PartitionerHelper.createPartitions(partitionKey, rightHandBoundaries, PartitionerHelper.locations(connector), addMinMax)

      So I hope you fix the bug in next version,
      thanks a lot.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            josen Guoshun Qu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: