cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Table Schema Error

Dave_Nithio
Contributor

I'm using Delta Live Tables to load a set of csv files in a directory. I am pre-defining the schema to avoid issues with schema inference. This works with autoloader on a regular delta table, but is failing for Delta Live Tables. Below is an example of the code I am using to define the schema and load into DLT:

# Define Schema
schema = StructType([
  StructField("ID",StringType(),True, {'comment': "Unique customer id"}),
  StructField("Test",StringType(),True, {'comment': "this is a test"}),
  ...)]
 
# Define Delta Live Table
@dlt.table(name="test_bronze",
                  comment = "Test data incrementally ingested from S3 Raw landing zone",
  table_properties={
    "quality": "bronze"
  },
  schema=schema
)
 
# Read Stream
def rafode_bronze():
  return (
    spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", source_format) # format is csv
                  .option("inferSchema", "False")
                  .option("header", "True")
                  .schema(schema)
                  .load(data_source) # data_source is S3 directory
  )

When attempting to run this as a Delta Live Table pipeline, I get an error that:

org.apache.spark.sql.AnalysisException: Failed to merge fields 'Test' and 'Test. Failed to merge incompatible data types IntegerType and DoubleType

I have attempted running the readStream with and without the `.option("inferSchema", "False")` to see if that allows for the pre-defined schema to be used vs an infered schema, but I run into the same error. It seems as though spark.readStream is not using the pre-defined schema on each read of the csv files in the directory which is causing schema differences and failure to load. Do I need to alter my readStream code to force the use of my schema or am I missing something else?

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz_Fatma
Community Manager
Community Manager

Hi @Dave Wilson​, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they don’t match, an exception is raised.

For reference-

https://docs.databricks.com/delta/delta-batch.html#schema-validation-1

To avoid this, you can cast the column explicitly before writing it to the target table.

View solution in original post

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @Dave Wilson​, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they don’t match, an exception is raised.

For reference-

https://docs.databricks.com/delta/delta-batch.html#schema-validation-1

To avoid this, you can cast the column explicitly before writing it to the target table.

shagun
New Contributor III

i was facing similar issue in loading json files through autoloader for delta live tables.

Was able to fix with this option

.option("cloudFiles.inferColumnTypes", "True")

From the docs "For formats that don’t encode data types (JSON and CSV), Auto Loader infers all columns as strings (including nested fields in JSON files)."

https://docs.databricks.com/ingestion/auto-loader/schema.html#

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!