-
Type: Bug
-
Resolution: Works as Designed
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: Schema
-
None
Hi, I use mongo-spark-connector save data to mongodb,
but for "_id" field, use ObjectId change String to ObjectId, and schema is StructFields.objectId("_id", nullable = false)
my code is:
val ret: RDD[Row] = largeRDD.map( t => { val objectId = new ObjectId(t._1) Row(objectId, t._2) }) val sqlContext = SparkSession.builder.getOrCreate() val df = sqlContext.createDataFrame(ret, DataTypes.createStructType(Array( StructFields.objectId("_id", nullable = false), StructField("f", IntegerType, nullable = true) )))
but got error:
Caused by: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string>
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 8, c3-hadoop-prc-st820.bj, executor 7): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string> named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)) AS _id#0 +- named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)) :- oid +- if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true) :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt : :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)) : : +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id) : : +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 0 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true) +- validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType) +- getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) AS f#1 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 1 :- null +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) AS d#2 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 2 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) AS m#3 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 3 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) AS s#4 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 4 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) AS e#5 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 5 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:293) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1128) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:153) at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:152) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:152) at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:151) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfCondExpr$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) ... 34 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:627) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1935) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1948) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1962) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:151) at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:191) at com.xiaomi.infra.codelab.spark.Utils$.saveMSED(Utils.scala:143) at com.xiaomi.infra.codelab.spark.RoomCheck$.main(RoomCheck.scala:99) at com.xiaomi.infra.codelab.spark.RoomCheck.main(RoomCheck.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:651) Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string> named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)) AS _id#0 +- named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)) :- oid +- if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true) :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt : :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)) : : +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id) : : +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 0 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true) +- validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType) +- getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) AS f#1 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 1 :- null +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) AS d#2 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 2 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) AS m#3 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 3 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) AS s#4 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 4 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) AS e#5 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] : +- 5 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e) +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) +- input[0, org.apache.spark.sql.Row, true] at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:293) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1128) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:153) at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:152) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:152) at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:151) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfCondExpr$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) ... 34 more