-
Type: Task
-
Resolution: Gone away
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
Labels:None
-
Environment:os version : centos 7.2
spark version : 2.1.0.2.6.0.3-8
mongodb version:3.4.6
use MongoSpark.save(result, writeConfig) insert mongodb sharding cluster,and the Document being written does not specify a stud, it is an auto-generated ObjectId:
coding:
JavaRDD<Document> result= sguColorBaseWeekSoldForecastValue.join(skuDistributionRatio).flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Tuple2<String, Float>>>, Document>() { @Override public Iterator<Document> call(Tuple2<Tuple2<String, String>, Tuple2<String, Tuple2<String, Float>>> tuple2Tuple2Tuple2) throws Exception { List<Document> listResult=new ArrayList<>(); .... return listResult.iterator(); } }); String database="sales_forecast"; String tablename="sku_flow_forecast_seasonal_csa"; Map<String, String> writeOverrides = new HashMap<String, String>(); writeOverrides.put("collection", tablename); writeOverrides.put("database",database); WriteConfig writeConfig = WriteConfig.create(sparkContext).withOptions(writeOverrides); MongoSpark.save(result,writeConfig);
log error:
18/01/03 16:24:24 ERROR Executor: Exception in task 45.0 in stage 37.0 (TID 2822) com.mongodb.MongoBulkWriteException: Bulk write operation error on server 172.16.9.56:27017. Write errors: [BulkWriteError{index=1, code=11000, message='E11000 duplicate key error collection: sales_forecast.sku_flow_forecast_seasonal_csa index: _id_ dup key: { : ObjectId('5a4c93380f846a27cd143835') }', details={ }}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176) at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:205) at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:146) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:190) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168) at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:230) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74) at com.mongodb.Mongo.execute(Mongo.java:781) at com.mongodb.Mongo$2.execute(Mongo.java:764) at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:323) at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:311) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:132) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:131) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:131) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)