cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Table Schema Error

Dave_Nithio
Contributor

I'm using Delta Live Tables to load a set of csv files in a directory. I am pre-defining the schema to avoid issues with schema inference. This works with autoloader on a regular delta table, but is failing for Delta Live Tables. Below is an example of the code I am using to define the schema and load into DLT:

# Define Schema
schema = StructType([
  StructField("ID",StringType(),True, {'comment': "Unique customer id"}),
  StructField("Test",StringType(),True, {'comment': "this is a test"}),
  ...)]
 
# Define Delta Live Table
@dlt.table(name="test_bronze",
                  comment = "Test data incrementally ingested from S3 Raw landing zone",
  table_properties={
    "quality": "bronze"
  },
  schema=schema
)
 
# Read Stream
def rafode_bronze():
  return (
    spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", source_format) # format is csv
                  .option("inferSchema", "False")
                  .option("header", "True")
                  .schema(schema)
                  .load(data_source) # data_source is S3 directory
  )

When attempting to run this as a Delta Live Table pipeline, I get an error that:

org.apache.spark.sql.AnalysisException: Failed to merge fields 'Test' and 'Test. Failed to merge incompatible data types IntegerType and DoubleType

I have attempted running the readStream with and without the `.option("inferSchema", "False")` to see if that allows for the pre-defined schema to be used vs an infered schema, but I run into the same error. It seems as though spark.readStream is not using the pre-defined schema on each read of the csv files in the directory which is causing schema differences and failure to load. Do I need to alter my readStream code to force the use of my schema or am I missing something else?

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @Dave Wilson​, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they don’t match, an exception is raised.

For reference-

https://docs.databricks.com/delta/delta-batch.html#schema-validation-1

To avoid this, you can cast the column explicitly before writing it to the target table.

View solution in original post

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @Dave Wilson​, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they don’t match, an exception is raised.

For reference-

https://docs.databricks.com/delta/delta-batch.html#schema-validation-1

To avoid this, you can cast the column explicitly before writing it to the target table.

shagun
New Contributor III

i was facing similar issue in loading json files through autoloader for delta live tables.

Was able to fix with this option

.option("cloudFiles.inferColumnTypes", "True")

From the docs "For formats that don’t encode data types (JSON and CSV), Auto Loader infers all columns as strings (including nested fields in JSON files)."

https://docs.databricks.com/ingestion/auto-loader/schema.html#

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.