-
Type: Bug
-
Resolution: Cannot Reproduce
-
Priority: Major - P3
-
None
-
Affects Version/s: 2.2.1
-
Component/s: Writes
-
None
-
Environment:Apache Spark 2.2.1, YARN
Let's have a mongo collection requests, which contains unique index on field field. Then, let's have this pseudo-code:
case class Request(field: String) import sparkSession.implicits._ val ds = sparkSession.createDataset(List( Request("one"), Request("one"), Request("two") )) val writeConf = new WriteConfig(db, col, connectionString = Some(uri)) .withOption("maxBatchSize", 5000) MongoSpark.save(ds.write.mode("append"), writeConfig)
This code will not fail. You will see in logs this error though:
18/05/03 18:13:32 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 11.0 (TID 276, samplehost.com, executor 5): com.mongodb.MongoBulkWriteException: Bulk write operation error on server samplehost.com:27017. Write errors: [BulkWriteError{index=34, code=11000, message='E11000 duplicate key error collection: sampledb.requests index: field key: { : "one" }', details={ }}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176) at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:205) at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:146) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:188) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168) at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:422) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:413) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74) at com.mongodb.Mongo.execute(Mongo.java:845) at com.mongodb.Mongo$2.execute(Mongo.java:828) at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:338) at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:322) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:119) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
In the database, record two will not be saved, but the whole Spark job will not fail and look like it is successful. In my opinion, the job should fail.