-
Type:
Bug
-
Resolution: Duplicate
-
Priority:
Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
None
-
(copied to CRM)
-
Java Drivers
-
None
-
None
-
None
-
None
-
None
-
None
The issue after upgrading the Spark from 3.0.2 to 10.2.1 the customer is getting merge errors. With 3.0.2 the inferschema option eliminated bad data while writing which is not the case with 10.x. Is the issue actually related to how Spark 10.2.1 works? Is there a resolution?
AnalysisException Traceback (most recent call last) <command-1865333763361084> in <module>----> 1 create_or_append_into_table( 2 collection="patient", 3 target_table_name = "load_patient", 4 column_for_change_data_capture=COLUMN_FOR_CHANGE_DATA_CAPTURE, 5 target_database=TARGET_DATABASE,/Workspace/Repos/cgupta@healthgorilla.com/databricks/services/utils/load.py in wrapper_timer(*args, **kwargs)149 def wrapper_timer(*args, **kwargs): 150 tic = time.perf_counter()--> 151 value = func(*args, **kwargs) 152 toc = time.perf_counter() 153 elapsed_time = toc - tic /Workspace/Repos/cgupta@healthgorilla.com/databricks/services/utils/load.py in create_or_append_into_table(collection, target_table_name, column_for_change_data_capture, target_database, sample_size, transformations, read_partition, filter_q, unique_key, m_database, m_host, schema, env)583 f"ERROR!!!! When trying to execute for collection: {collection} with analysisException: {str(analysisException)}" 584 )--> 585 raise analysisException 586 587 @custom_timer /Workspace/Repos/cgupta@healthgorilla.com/databricks/services/utils/load.py in create_or_append_into_table(collection, target_table_name, column_for_change_data_capture, target_database, sample_size, transformations, read_partition, filter_q, unique_key, m_database, m_host, schema, env)552 # Append data into table 553 (--> 554 updates_df.write.format("delta") 555 .option( 556 "path", get_table_storage_path(database_dot_table_name=delta_table, env = env, database=target_database)/databricks/spark/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options) 804 if format is not None: 805 self.format(format)--> 806 self._jwrite.saveAsTable(name) 807 808 def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in call(self, *args) 1302 1303 answer = self.gateway_client.send_command(command)-> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 121 # Hide where the exception came from that shows a non-Pythonic 122 # JVM exception message.--> 123 raise converted from None 124 else: 125 raiseAnalysisException: Failed to merge fields 'diagnoses' and 'diagnoses'. Failed to merge incompatible data types StructType(StructField(_id,StringType,true),StructField(code,StringType,true),StructField(creationDate,TimestampType,true),StructField(icdVersion,StringType,true),StructField(longDescription,StringType,true),StructField(mappedDiagnoses,ArrayType(StructType(StructField(code,StringType,true),StructField(shortDescription,StringType,true),StructField(longDescription,StringType,true),StructField(icdVersion,StringType,true),StructField(_id,StringType,true),StructField(creationDate,TimestampType,true)),true),true),StructField(shortDescription,StringType,true))and StringType`Just to add, if the customer runs the same ETL using "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2" it is working perfectly fine, but as soon as I push the connector update to "org.mongodb.spark:mongo-spark-connector_2.12:10.2.1" it starts failing.Here is the code that they are using for this upgrade 10.2.1def read_collection_from_mongodb_v10( m_host: str, m_database: str, collection: str, filter: str = "1=1", sample_size: int = 1000, schema: str = None, env: str = 'dev') -> DataFrame: logger.info(f"Trying to read collection:{collection} with filter:{filter} ") if schema is None: return ( spark.read.format("mongodb") .option("spark.mongodb.read.connection.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("sql.inferSchema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("partitionSizeMB", "32") .option("com.mongodb.spark.sql.connector.read.partitioner.options.samplesPerPartition", 1) .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000") .load() .filter(filter) .withColumn("is_deleted", lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) ) else: return ( spark.read.format("mongodb") .option("spark.mongodb.read.connection.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("sql.inferSchema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.partitionSizeMB", "32") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000") .schema(schema) .load() .filter(filter) .withColumn("is_deleted", lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) )and this is the code they are using for 3.0.2def read_collection_from_mongodb_v3( m_host: str, m_database: str, collection: str, filter: str = "1=1", sample_size: int = 1000, schema: str = None, partitionby: str = None, env : str = 'dev' ) -> DataFrame: logger.info(f"Trying to read collection:{collection} with filter:{filter} ") if schema is None: logger.info(f"I dont have schema with filter {filter}") return ( spark.read.format("com.mongodb.spark.sql.DefaultSource") .option("spark.mongodb.input.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("spark.mongodb.batchSize", 1024) .option("spark.mongodb.samplePoolSize", 10000) .option("sql.inferschema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("partitionSizeMB", "32") .option("com.mongodb.spark.sql.connector.read.partitioner.options.samplesPerPartition", 1) .load() .filter(filter) .withColumn("is_deleted",lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) ) else: logger.info("I do have schema") return ( spark.read.format("com.mongodb.spark.sql.DefaultSource") .option("spark.mongodb.input.uri", get_connection_string(collection, env=env, m_host=m_host, m_database=m_database)) .option("spark.mongodb.sampleSize", sample_size) .option("spark.mongodb.maxBatchSize", 1024) .option("spark.mongodb.batchSize", 1024) .option("spark.mongodb.samplePoolSize", 10000) .option("sql.inferschema.mapTypes.enabled", "false") .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner") .option("partitionSizeMB", "32") .schema(schema) .load() .filter(filter) .withColumn("is_deleted",lit(False)) .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("ingestion_date", current_date()) )besides this everything else is the same.
This is supporting code the customer initially provided using
org.mongodb.spark:mongo-spark-connector_2.12:10.2.1 (Maven)
def read_collection_from_mongodb_v10( m_host: str, m_database: str,
collection: str, filter: str = "1=1", sample_size: int = 1000,
schema: str = None, env: str = 'dev') -> DataFrame:
logger.info(f"Trying to read collection:{collection} with filter:{filter} ")
if schema is None:
return (
spark.read.format("mongodb")
.option("spark.mongodb.read.connection.uri",
get_connection_string(collection, env=env, m_host=m_host, m_database=m_database))
.option("spark.mongodb.sampleSize", sample_size)
- .option("spark.mongodb.maxBatchSize", 1024)
.option("sql.inferSchema.mapTypes.enabled", "false")
.option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.partitionSizeMB", "32")
.option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 1)
.option("spark.mongodb.input.partitioner", "MongoSamplePartitioner")
.option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000")
.load()
.filter(filter)
.withColumn("is_deleted", lit(False))
.withColumn("ingestion_timestamp", current_timestamp())
.withColumn("ingestion_date", current_date())
)
else:
return (
spark.read.format("mongodb")
.option("spark.mongodb.read.connection.uri",
get_connection_string(collection, env=env, m_host=m_host, m_database=m_database))
.option("spark.mongodb.sampleSize", sample_size) - .option("spark.mongodb.maxBatchSize", 1024)
.option("sql.inferSchema.mapTypes.enabled", "false")
.option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 1)
.option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.partitionSizeMB", "32")
.option("spark.mongodb.input.partitioner", "MongoSamplePartitioner")
.option("spark.mongodb.input.partitionerOptions.MongoSamplePartitioner.samplePoolSize", "10000")
.schema(schema)
.load()
.filter(filter)
.withColumn("is_deleted", lit(False))
.withColumn("ingestion_timestamp", current_timestamp())
.withColumn("ingestion_date", current_date())
)