cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta Live Table Schema Error

Dave_Nithio
Contributor

I'm using Delta Live Tables to load a set of csv files in a directory. I am pre-defining the schema to avoid issues with schema inference. This works with autoloader on a regular delta table, but is failing for Delta Live Tables. Below is an example of the code I am using to define the schema and load into DLT:

# Define Schema
schema = StructType([
  StructField("ID",StringType(),True, {'comment': "Unique customer id"}),
  StructField("Test",StringType(),True, {'comment': "this is a test"}),
  ...)]
 
# Define Delta Live Table
@dlt.table(name="test_bronze",
                  comment = "Test data incrementally ingested from S3 Raw landing zone",
  table_properties={
    "quality": "bronze"
  },
  schema=schema
)
 
# Read Stream
def rafode_bronze():
  return (
    spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", source_format) # format is csv
                  .option("inferSchema", "False")
                  .option("header", "True")
                  .schema(schema)
                  .load(data_source) # data_source is S3 directory
  )

When attempting to run this as a Delta Live Table pipeline, I get an error that:

org.apache.spark.sql.AnalysisException: Failed to merge fields 'Test' and 'Test. Failed to merge incompatible data types IntegerType and DoubleType

I have attempted running the readStream with and without the `.option("inferSchema", "False")` to see if that allows for the pre-defined schema to be used vs an infered schema, but I run into the same error. It seems as though spark.readStream is not using the pre-defined schema on each read of the csv files in the directory which is causing schema differences and failure to load. Do I need to alter my readStream code to force the use of my schema or am I missing something else?

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @Dave Wilsonโ€‹, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they donโ€™t match, an exception is raised.

For reference-

https://docs.databricks.com/delta/delta-batch.html#schema-validation-1

To avoid this, you can cast the column explicitly before writing it to the target table.

View solution in original post

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @Dave Wilsonโ€‹, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they donโ€™t match, an exception is raised.

For reference-

https://docs.databricks.com/delta/delta-batch.html#schema-validation-1

To avoid this, you can cast the column explicitly before writing it to the target table.

shagun
New Contributor III

i was facing similar issue in loading json files through autoloader for delta live tables.

Was able to fix with this option

.option("cloudFiles.inferColumnTypes", "True")

From the docs "For formats that donโ€™t encode data types (JSON and CSV), Auto Loader infers all columns as strings (including nested fields in JSON files)."

https://docs.databricks.com/ingestion/auto-loader/schema.html#

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.