11-01-2022 02:03 PM
I'm using Delta Live Tables to load a set of csv files in a directory. I am pre-defining the schema to avoid issues with schema inference. This works with autoloader on a regular delta table, but is failing for Delta Live Tables. Below is an example of the code I am using to define the schema and load into DLT:
# Define Schema
schema = StructType([
StructField("ID",StringType(),True, {'comment': "Unique customer id"}),
StructField("Test",StringType(),True, {'comment': "this is a test"}),
...)]
# Define Delta Live Table
@dlt.table(name="test_bronze",
comment = "Test data incrementally ingested from S3 Raw landing zone",
table_properties={
"quality": "bronze"
},
schema=schema
)
# Read Stream
def rafode_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", source_format) # format is csv
.option("inferSchema", "False")
.option("header", "True")
.schema(schema)
.load(data_source) # data_source is S3 directory
)
When attempting to run this as a Delta Live Table pipeline, I get an error that:
org.apache.spark.sql.AnalysisException: Failed to merge fields 'Test' and 'Test. Failed to merge incompatible data types IntegerType and DoubleType
I have attempted running the readStream with and without the `.option("inferSchema", "False")` to see if that allows for the pre-defined schema to be used vs an infered schema, but I run into the same error. It seems as though spark.readStream is not using the pre-defined schema on each read of the csv files in the directory which is causing schema differences and failure to load. Do I need to alter my readStream code to force the use of my schema or am I missing something else?
11-17-2022 03:23 AM
Hi @Dave Wilson, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they don’t match, an exception is raised.
For reference-
https://docs.databricks.com/delta/delta-batch.html#schema-validation-1
To avoid this, you can cast the column explicitly before writing it to the target table.
11-17-2022 03:23 AM
Hi @Dave Wilson, The delta table performs schema validation of every column, and the source dataframe column data types must match the column data types in the target table. If they don’t match, an exception is raised.
For reference-
https://docs.databricks.com/delta/delta-batch.html#schema-validation-1
To avoid this, you can cast the column explicitly before writing it to the target table.
06-01-2023 06:39 AM
i was facing similar issue in loading json files through autoloader for delta live tables.
Was able to fix with this option
.option("cloudFiles.inferColumnTypes", "True")
From the docs "For formats that don’t encode data types (JSON and CSV), Auto Loader infers all columns as strings (including nested fields in JSON files)."
https://docs.databricks.com/ingestion/auto-loader/schema.html#
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.