Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-162

data from spark insert to mongodb duplicate key error

    • Type: Icon: Task Task
    • Resolution: Gone away
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • Labels:
      None
    • Environment:
      os version : centos 7.2
      spark version : 2.1.0.2.6.0.3-8
      mongodb version:3.4.6

      use MongoSpark.save(result, writeConfig) insert mongodb sharding cluster,and the Document being written does not specify a stud, it is an auto-generated ObjectId:

      coding:

      JavaRDD<Document> result= sguColorBaseWeekSoldForecastValue.join(skuDistributionRatio).flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Tuple2<String, Float>>>, Document>() {
                  @Override
                  public Iterator<Document> call(Tuple2<Tuple2<String, String>, Tuple2<String, Tuple2<String, Float>>> tuple2Tuple2Tuple2) throws Exception {
                      List<Document> listResult=new ArrayList<>();
      
                      ....
                      
                      return listResult.iterator();
                  }
              });
      
      
              String database="sales_forecast";
              String tablename="sku_flow_forecast_seasonal_csa";
      
              Map<String, String> writeOverrides = new HashMap<String, String>();
              writeOverrides.put("collection", tablename);
              writeOverrides.put("database",database);
      
              WriteConfig writeConfig = WriteConfig.create(sparkContext).withOptions(writeOverrides);
              MongoSpark.save(result,writeConfig);
      

      log error:

      18/01/03 16:24:24 ERROR Executor: Exception in task 45.0 in stage 37.0 (TID 2822)
      com.mongodb.MongoBulkWriteException: Bulk write operation error on server 172.16.9.56:27017. Write errors: [BulkWriteError{index=1, code=11000, message='E11000 duplicate key error collection: sales_forecast.sku_flow_forecast_seasonal_csa index: _id_ dup key: { : ObjectId('5a4c93380f846a27cd143835') }', details={ }}]. 
      	at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)
      	at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:205)
      	at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:146)
      	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:190)
      	at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
      	at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:230)
      	at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221)
      	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
      	at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
      	at com.mongodb.Mongo.execute(Mongo.java:781)
      	at com.mongodb.Mongo$2.execute(Mongo.java:764)
      	at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:323)
      	at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:311)
      	at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132)
      	at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:132)
      	at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:131)
      	at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
      	at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
      	at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      	at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
      	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
      	at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
      	at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:131)
      	at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
      	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
      	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      	at org.apache.spark.scheduler.Task.run(Task.scala:99)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            Su SuperSuJJ [X]
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: