I want to use spark to insert DBRef, but I find only 24 digit number or characters can be inserted.
Code sample:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import com.mongodb.spark._ import com.mongodb.spark.config._ import org.bson.Document import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row import org.apache.spark.sql.types.ArrayType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.DataTypes import org.apache.spark.sql.types.MapType import org.apache.spark.sql.types.DateType import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.types.ObjectType import org.apache.spark.sql.types.DataType import org.apache.spark.sql.catalyst.expressions.Encode import org.apache.spark.rdd.RDD object MongoNestedDocTest { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder() .master("local") .appName("MongoSparkConnector") .config("spark.mongodb.input.uri", "mongodb://bigdata:bigdata@servername:27017/bigdata") .config("spark.mongodb.output.uri", "mongodb://bigdata:bigdata@servername:27017/bigdata") .config("spark.mongodb.output.database", "bigdata") .getOrCreate() val nestedDBRefRDD = sparkSession.sparkContext.parallelize( Row("Warsaw", Array(Row("people", Row("100")), Row("people", Row("200")))) :: Row("Corted", Array(Row("people", Row("100")), Row("people", Row("200")))) :: Nil) val nestedDBRefSchemaRes = StructType( StructField("userName", StringType, false) :: StructField("dbRefId", ArrayType(StructType( StructField("$ref", StringType, true) :: StructField("$id", StructType(StructField("oid", StringType, true) :: Nil), false) :: Nil)) ) :: Nil) val nestedDBRefDF = sparkSession.createDataFrame(nestedDBRefRDD, nestedDBRefSchemaRes) nestedDBRefDF.show() nestedDBRefDF.write.option("collection", "rddMerge").mode("overwrite").format("com.mongodb.spark.sql").save() } } You will see the following exception: {color:red}Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast [100] into a StructType(StructField(oid,StringType,true)) at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToBsonValue(MapFunctions.scala:92) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocument$1.apply(MapFunctions.scala:59) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocument$1.apply(MapFunctions.scala:57) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at com.mongodb.spark.sql.MapFunctions$.rowToDocument(MapFunctions.scala:57) at com.mongodb.spark.sql.MapFunctions$$anonfun$8.apply(MapFunctions.scala:177){color} If you change data into 24 digit number or characters such as below, it works fine. val nestedDBRefRDD = sparkSession.sparkContext.parallelize( Row("Warsaw", Array(Row("people", Row("59b61d2487aa56da25cb9034")), Row("people", Row("599fccd2b9a88e237cb1597c")))) :: Row("Corted", Array(Row("people", Row("599fcd76b9a88e237cb1597f")), Row("people", Row("599fccffb9a88e237cb1597d")))) :: Nil)
I really don't want to use so long objectId as DBRef. Is it a bug? Could you do me a favor?