After upgrading to 10.2.1 from 3.0.2 receiving Failed to merge incompatible data types

XMLWordPrintableJSON

    • Type: Bug
    • Resolution: Duplicate
    • Priority: Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • None
    • Java Drivers
    • Hide

      1. What would you like to communicate to the user about this feature?
      2. Would you like the user to see examples of the syntax and/or executable code and its output?
      3. Which versions of the driver/connector does this apply to?

      Show
      1. What would you like to communicate to the user about this feature? 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to?
    • None
    • None
    • None
    • None
    • None
    • None

      The issue after upgrading the Spark from 3.0.2 to 10.2.1 the customer is getting merge errors. With 3.0.2 the inferschema option eliminated bad data while writing which is not the case with 10.x.  Is the issue actually related to how Spark 10.2.1 works? Is there a resolution?

       

      AnalysisException Traceback (most recent call last)
      <command-1865333763361084> in <module>----> 
      1 create_or_append_into_table(
      2 collection="patient",
      3 target_table_name = "load_patient",
      4 column_for_change_data_capture=COLUMN_FOR_CHANGE_DATA_CAPTURE,
      5 target_database=TARGET_DATABASE,/Workspace/Repos/cgupta@healthgorilla.com/databricks/services/utils/load.py in wrapper_timer(*args, **kwargs)149 def wrapper_timer(*args, **kwargs):
      150 tic = time.perf_counter()--> 
      151 value = func(*args, **kwargs)
      152 toc = time.perf_counter()
      153 elapsed_time = toc - tic
      /Workspace/Repos/cgupta@healthgorilla.com/databricks/services/utils/load.py in create_or_append_into_table(collection, target_table_name, column_for_change_data_capture, target_database, sample_size, transformations, read_partition, filter_q, unique_key, m_database, m_host, schema, env)583 f"ERROR!!!! When trying to execute for collection: {collection} with analysisException: {str(analysisException)}"
      584 )--> 585 raise analysisException
      586
      587 @custom_timer
      /Workspace/Repos/cgupta@healthgorilla.com/databricks/services/utils/load.py in create_or_append_into_table(collection, target_table_name, column_for_change_data_capture, target_database, sample_size, transformations, read_partition, filter_q, unique_key, m_database, m_host, schema, env)552 # Append data into table
      553 (--> 
      554 updates_df.write.format("delta")
      555 .option(
      556 "path", get_table_storage_path(database_dot_table_name=delta_table, env = env, database=target_database)/databricks/spark/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
      804 if format is not None:
      805 self.format(format)--> 806 self._jwrite.saveAsTable(name)
      807
      808 def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in call(self, *args)
      1302
      1303 answer = self.gateway_client.send_command(command)-> 
      1304 return_value = get_return_value(
      1305 answer, self.gateway_client, self.target_id, self.name)
      1306/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
      121 # Hide where the exception came from that shows a non-Pythonic
      122 # JVM exception message.--> 123 raise converted from None
      124 else:
      125 raiseAnalysisException: Failed to merge fields 'diagnoses' and 'diagnoses'. Failed to merge incompatible data types StructType(StructField(_id,StringType,true),StructField(code,StringType,true),StructField(creationDate,TimestampType,true),StructField(icdVersion,StringType,true),StructField(longDescription,StringType,true),StructField(mappedDiagnoses,ArrayType(StructType(StructField(code,StringType,true),StructField(shortDescription,StringType,true),StructField(longDescription,StringType,true),StructField(icdVersion,StringType,true),StructField(_id,StringType,true),StructField(creationDate,TimestampType,true)),true),true),StructField(shortDescription,StringType,true))and StringType`Just to add, if the customer runs the same ETL using "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2" it is working perfectly fine, 
      but as soon as I push the connector update to "org.mongodb.spark:mongo-spark-connector_2.12:10.2.1" it starts failing.Here is the code that they are using for this upgrade 10.2.1def read_collection_from_mongodb_v10( m_host: str, m_database: str, collection: str, filter: str = "1=1", sample_size: int = 1000, schema: str = None, env: str = 'dev') -> DataFrame: logger.info(f"Trying to read collection:{collection} with filter:{filter} ") if schema is None: return ( spark.read.format("mongodb") .option("spark.mongodb.read.connection.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("sql.inferSchema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("partitionSizeMB", "32") .option("com.mongodb.spark.sql.connector.read.partitioner.options.samplesPerPartition", 1) .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000") .load() .filter(filter) .withColumn("is_deleted", lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) ) else: return ( spark.read.format("mongodb") .option("spark.mongodb.read.connection.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("sql.inferSchema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.partitionSizeMB", "32") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000") .schema(schema) .load() .filter(filter) .withColumn("is_deleted", lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) )and this is the code they are using for 3.0.2def read_collection_from_mongodb_v3( m_host: str, m_database: str, collection: str, filter: str = "1=1", sample_size: int = 1000, schema: str = None, partitionby: str = None, env : str = 'dev' ) -> DataFrame: logger.info(f"Trying to read collection:{collection} with filter:{filter} ") if schema is None: logger.info(f"I dont have schema with filter {filter}") return ( spark.read.format("com.mongodb.spark.sql.DefaultSource") .option("spark.mongodb.input.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("spark.mongodb.batchSize", 1024) .option("spark.mongodb.samplePoolSize", 10000) .option("sql.inferschema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("partitionSizeMB", "32") .option("com.mongodb.spark.sql.connector.read.partitioner.options.samplesPerPartition", 1) .load() .filter(filter) .withColumn("is_deleted",lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) ) else: logger.info("I do have schema") return ( spark.read.format("com.mongodb.spark.sql.DefaultSource") .option("spark.mongodb.input.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("spark.mongodb.batchSize", 1024) .option("spark.mongodb.samplePoolSize", 10000) .option("sql.inferschema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("partitionSizeMB", "32") .schema(schema) .load() .filter(filter) .withColumn("is_deleted",lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) )besides this everything else is the same. 
      

       

      This is supporting code the customer initially provided using
      org.mongodb.spark:mongo-spark-connector_2.12:10.2.1 (Maven)

       

      def read_collection_from_mongodb_v10( m_host: str, m_database: str,
      collection: str, filter: str = "1=1", sample_size: int = 1000,
      schema: str = None, env: str = 'dev') -> DataFrame:
      logger.info(f"Trying to read collection:{collection} with filter:{filter} ")
      if schema is None:
      return (
      spark.read.format("mongodb")
      .option("spark.mongodb.read.connection.uri",
      get_connection_string(collection, env=env, m_host=m_host, m_database=m_database))
      .option("spark.mongodb.sampleSize", sample_size)

      1. .option("spark.mongodb.maxBatchSize", 1024)
        .option("sql.inferSchema.mapTypes.enabled", "false")
        .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.partitionSizeMB", "32")
        .option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 1)
        .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner")
        .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000")
        .load()
        .filter(filter)
        .withColumn("is_deleted", lit(False))
        .withColumn("ingestion_timestamp", current_timestamp())
        .withColumn("ingestion_date", current_date())
        )
        else:
        return (
        spark.read.format("mongodb")
        .option("spark.mongodb.read.connection.uri",
        get_connection_string(collection, env=env, m_host=m_host, m_database=m_database))
        .option("spark.mongodb.sampleSize", sample_size)
      2. .option("spark.mongodb.maxBatchSize", 1024)
        .option("sql.inferSchema.mapTypes.enabled", "false")
        .option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 1)
        .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.partitionSizeMB", "32")
        .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner")
        .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000")
        .schema(schema)
        .load()
        .filter(filter)
        .withColumn("is_deleted", lit(False))
        .withColumn("ingestion_timestamp", current_timestamp())
        .withColumn("ingestion_date", current_date())
        )

       

              Assignee:
              Unassigned
              Reporter:
              Shawn Hawkins
              None
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: