Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-132

org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string>

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: Schema
    • None

      Hi, I use mongo-spark-connector save data to mongodb,
      but for "_id" field, use ObjectId change String to ObjectId, and schema is StructFields.objectId("_id", nullable = false)
      my code is:

          val ret: RDD[Row] = largeRDD.map(
            t => {
                val objectId = new ObjectId(t._1)
      
                Row(objectId, t._2)
            })
      
          val sqlContext = SparkSession.builder.getOrCreate()
      
          val df = sqlContext.createDataFrame(ret, DataTypes.createStructType(Array(
            StructFields.objectId("_id", nullable = false),
            StructField("f", IntegerType, nullable = true)
          )))
      

      but got error:
      Caused by: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string>

      User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 8, c3-hadoop-prc-st820.bj, executor 7): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string>
      named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)) AS _id#0
      +- named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true))
      :- oid
      +- if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)
      :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt
      : :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true))
      : : +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id)
      : : +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 0
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)
      +- validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType)
      +- getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true))
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) AS f#1
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 1
      :- null
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) AS d#2
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 2
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) AS m#3
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 3
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) AS s#4
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 4
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) AS e#5
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 5
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:293)
      at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
      at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
      at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
      at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1128)
      at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:153)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:152)
      at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
      at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
      at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
      at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:152)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:151)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string>
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfCondExpr$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
      ... 34 more
      Driver stacktrace:
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:627)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1935)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1948)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1962)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
      at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:151)
      at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:87)
      at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
      at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
      at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:191)
      at com.xiaomi.infra.codelab.spark.Utils$.saveMSED(Utils.scala:143)
      at com.xiaomi.infra.codelab.spark.RoomCheck$.main(RoomCheck.scala:99)
      at com.xiaomi.infra.codelab.spark.RoomCheck.main(RoomCheck.scala)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:651)
      Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string>
      named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)) AS _id#0
      +- named_struct(oid, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true))
      :- oid
      +- if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)
      :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)).isNullAt
      : :- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true))
      : : +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id)
      : : +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 0
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType), true)
      +- validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid), StringType)
      +- getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true)), 0, oid)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id), StructField(oid,StringType,true))
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, _id)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType) AS f#1
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 1
      :- null
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f), IntegerType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, f)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true) AS d#2
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 2
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, d)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true) AS m#3
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 3
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, m)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true) AS s#4
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 4
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, s)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true) AS e#5
      +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true)
      :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
      : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      : : +- input[0, org.apache.spark.sql.Row, true]
      : +- 5
      :- null
      +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e), StringType)
      +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, e)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
      +- input[0, org.apache.spark.sql.Row, true]
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:293)
      at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
      at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
      at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
      at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1128)
      at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:153)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:152)
      at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
      at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
      at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
      at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:152)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:151)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1948)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.RuntimeException: org.bson.types.ObjectId is not a valid external type for schema of struct<oid:string>
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfCondExpr$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
      ... 34 more
      

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            xmjacky Jacky
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: