โ05-02-2023 01:38 AM
Databricks Runtime: 12.2 LTS, Spark: 3.3.2, Delta Lake: 2.2.0
A target table with schema ([c1: integer, c2: integer]), allows us to write into target table using data with schema ([c1: integer, c2: double]). I expected it to throw an exception (same as it does using normal spark write INSERT operation), but instead it stored the data with mismatched datatype for field c2.
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from delta import DeltaTable
# Source data
schema = StructType([StructField("c1", IntegerType(), False), StructField("c2", DoubleType(), False)])
rdd_output = spark.sparkContext.parallelize([(4, 1.4), (5, 5.0), (6, 3.5),])
df_source = spark.createDataFrame(rdd_output, schema=schema)
# write source to target table using merge
target_table = DeltaTable.forName(spark, "default.test_datatype_misalignment")
merge = target_table.alias("target").merge(df_source.alias("source"), "target.c1 = source.c1")
merge.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
spark.table("default.test_datatype_misalignment").show()
# OUTPUT
#+---+---+
#| c1| c2|
#+---+---+
#| 1| 1|
#| 2| 1|
#| 3| 5|
#| 4| 1|
#| 5| 5|
#| 6| 3|
#+---+---+
# write source to target table using insert
df_source.write.format("delta").mode("append").saveAsTable("default.test_datatype_misalignment")
# OUTPUT
#AnalysisException: Failed to merge fields 'c2' and 'c2'. Failed to merge incompatible data types IntegerType and DoubleType
I'am expecting an exception to be raised regardless of the write command, why is this not the case?
โ05-02-2023 04:32 AM
perhaps schema evolution is enabled?
โ05-09-2023 01:33 PM
Hi @Sigrun Nordliโ, You're correct that the behaviour you're observing is not intuitive. However, this behaviour is due to how the MERGE operation in Delta Lake works in conjunction with type coercion rules in Apache Sparkโข. When you use MERGE, Delta Lake's merge operation performs an UPSERT (update or insert) rather than a traditional INSERT. This operation inherently involves comparing columns to decide which rows need to be updated, which should be inserted, and which should be left as is.
In your case, the comparison is the target.c1 = source.c1. While performing this operation, Apache Sparkโข's type coercion rules come into play. These rules try to reconcile the differences in types to make the comparison possible. Spark silently converts the integer type to double to operate in your case. The conversion is possible, and no precision is lost. Hence no exception is raised.
However, when you use the traditional INSERT operation, Spark expects the schema of the source and target data frames to match precisely. If there are mismatches in the types, it throws an AnalysisException, as you've observed.
So, to answer your question, the difference in behaviour is because the MERGE operation handles type mismatches differently than the INSERT operation. This results from Delta Lake's merge operation and Apache Sparkโข's type coercion rules.
However, if you want to enforce strict schema checks before using merge, you must implement them manually in your code before performing the merge operation.
โ05-18-2023 11:31 PM
Hi @Sigrun Nordliโ
Thank you for posting your question in our community! We are happy to assist you.
To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?
This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.