when I use mongo-spark connector to load data from mongodb, if the items number level is ten thousand, and lose data.
String startTime = yearString + monthString + "00000000";
String endime = yearString + monthString + "99000000";
Document filterCondition = new Document().append(filterDateField,new Document("$gt",startTime).append("$lt",endime));
ReadConfig readConfig = ReadConfig.create(session);
long srcCount = MongoConnector.apply(readConfig.asOptions()).withCollectionDo(readConfig, Document.class,
new Function<MongoCollection<Document>, Long>() {
/** */
private static final long serialVersionUID = 1L;
@Override
public Long call(MongoCollection<Document> v1) throws ExceptionUnknown macro: { return v1.count(filterCondition); }});
System.out.println("count in db : "+srcCount);
// readConfig = readConfig.withOption("partitioner", MySamplePartitioner.class.getName());
JavaMongoRDD<Document> mongoRdd = MongoSpark.load(jsc, readConfig);
Document filterDoc = new Document("$match",filterCondition);
JavaMongoRDD<Document> filterRdd = mongoRdd.withPipeline(Collections.singletonList(filterDoc));
org.apache.spark.util.LongAccumulator counter = session.sparkContext().longAccumulator();
filterRdd.foreach(new VoidFunction<Document>() {
@Override
public void call(Document document) throws ExceptionUnknown macro: { counter.add(1); }});
System.out.println("count by partitioner: "+counter.value());
console out:
count in db : 231560
count by partitioner: 199158
Then I saw the code in default partitioner :MongoSamplePartitioner;
Fetch sample keys by aggregate command;
But the min key and the max key were not included in samples;
So the data between min Key and min sample key were missing, the same as data between max sample key between max key;
Then I add the boundary key, and the data were load fully.
val samples = connector.withCollectionDo(readConfig,
Unknown macro: { coll})
val minKey = connector.withCollectionDo(readConfig,Unknown macro: { coll})
val maxKey = connector.withCollectionDo(readConfig,Unknown macro: { coll})
def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count - 1
val rightHandBoundaries = samples.zipWithIndex.collectUnknown macro: { case (field, i) if collectSplit(i)=> field.get(partitionKey) }val addMinMax = matchQuery.isEmpty
rightHandBoundaries.insert(0,minKey.head.get(partitionKey))
rightHandBoundaries.insert(rightHandBoundaries.size,maxKey.head.get(partitionKey))
val partitions = PartitionerHelper.createPartitions(partitionKey, rightHandBoundaries, PartitionerHelper.locations(connector), addMinMax)
So I hope you fix the bug in next version,
thanks a lot.