Delta Live Table Schema Error
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-01-2022 02:03 PM
I'm using Delta Live Tables to load a set of csv files in a directory. I am pre-defining the schema to avoid issues with schema inference. This works with autoloader on a regular delta table, but is failing for Delta Live Tables. Below is an example of the code I am using to define the schema and load into DLT:
# Define Schema
schema = StructType([
StructField("ID",StringType(),True, {'comment': "Unique customer id"}),
StructField("Test",StringType(),True, {'comment': "this is a test"}),
...)]
# Define Delta Live Table
@dlt.table(name="test_bronze",
comment = "Test data incrementally ingested from S3 Raw landing zone",
table_properties={
"quality": "bronze"
},
schema=schema
)
# Read Stream
def rafode_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", source_format) # format is csv
.option("inferSchema", "False")
.option("header", "True")
.schema(schema)
.load(data_source) # data_source is S3 directory
)
When attempting to run this as a Delta Live Table pipeline, I get an error that:
org.apache.spark.sql.AnalysisException: Failed to merge fields 'Test' and 'Test. Failed to merge incompatible data types IntegerType and DoubleType
I have attempted running the readStream with and without the `.option("inferSchema", "False")` to see if that allows for the pre-defined schema to be used vs an infered schema, but I run into the same error. It seems as though spark.readStream is not using the pre-defined schema on each read of the csv files in the directory which is causing schema differences and failure to load. Do I need to alter my readStream code to force the use of my schema or am I missing something else?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-01-2023 06:39 AM
i was facing similar issue in loading json files through autoloader for delta live tables.
Was able to fix with this option
.option("cloudFiles.inferColumnTypes", "True")
From the docs "For formats that don’t encode data types (JSON and CSV), Auto Loader infers all columns as strings (including nested fields in JSON files)."
https://docs.databricks.com/ingestion/auto-loader/schema.html#