Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-314

Provide pipeline push-down to load DF using the MongoSpark companion object to prevent spark 3.1.x new exception for nested duplicate columns "AnalysisException: Found duplicate column(s) in the data schema"

    • Type: Icon: Improvement Improvement
    • Resolution: Works as Designed
    • Priority: Icon: Unknown Unknown
    • None
    • Affects Version/s: None
    • Component/s: None
    • Labels:

      Hi, first of all, kudos to you for the great spark connector.

      Spark 3.1.x has added duplicate columns analyses for nested columns:
      From the migration guide link here:

       

      • In Spark 3.1, the Parquet, ORC, Avro and JSON datasources throw the exception org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema in read if they detect duplicate names in top-level columns as well in nested structures. The datasources take into account the SQL config spark.sql.caseSensitive while detecting column name duplicates.

       

      This new nested analysis doesn't allow to load data from any mongo collection that has duplicated fields in nested objects. Spark even doesn't allow us to load and drop the "problematic" columns, because that when we load using the SQL API complains before we can drop them.

      The reason it happens, from my understanding, is due to the fact that mongo SQL connector `com.mongodb.spark.sql.DefaultSource` don't push down `.drop(cols)` operations to mongo before loading it to spark. On the other hand, `.filter()` operations are pushed down to mongo (it is also documented and I can see it implemented in the code).

      So for example, lets say I load from mongo:

      val rawDf: DataFrame = spark.read.format("com.mongodb.spark.sql.DefaultSource")
            .option("spark.mongodb.input.uri", mongoUri)
            .option("spark.mongodb.input.database", dbName)
            .option("sampleSize", 50000)
            .option("collection", myCollectionWithNestedDuplicatedColumns)
            .load()
         
      val cleanedDf: DataFrame = rawDf
            .drop("problematicColumn1", "problematicColumn2")
      
      

      Spark will throw org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema.

      Possible solutions:
      1) Push-down a projection excluding those columns in the load. (simple solution, but limited to first load)

      2) Add a push-down for drops columns in com.mongodb.spark.sql.MongoRelation. (actually I didnt deep dive enough to check if it is possible. Will be happy to listen a feedback from those who know it better then me)

      I have used the first solution in my own pipelines, using the companion object com.mongodb.spark.MongoSpark to get the builder (which is public) and then by-passing the companion object to build the MongoSpark case class by myself:

      val dataSourcesDf = MongoSpark
       .builder()
       .sparkSession(spark)
       .pipeline(mongoDropColsPushDown)
       .readConfig(readConf)
       .build()
       .toDF()

      This is feasble since the builder is public, but I am not sure you meant to provide it to public use and for less experienced developers, who would not deep dive in the source code, it can be a blocker to use Spark 3.1.x together with this mongo connector.
      {{ Futhermore, the push-down **`withPipeline` method is implemented for RDD, but we dont want to fall-back to RDD programing, I guess (or even load as RDD and them convert to DF, which as well we would not expect for new user os this connector to reason about).}}

      To solve this, we can just add the pipeline for the load methods in the MongoSpark as an optional parameter, and one can use it if need. And, we would have to document it as well, since it can happen to anyone using Spark 3.1.x+.

       

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            yerachmiel@zencity.io Yerachmiel Feltzman
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: