I'm using Delta Live Tables to load a set of csv files in a directory. I am pre-defining the schema to avoid issues with schema inference. This works with autoloader on a regular delta table, but is failing for Delta Live Tables. Below is an example of the code I am using to define the schema and load into DLT:
# Define Schema
schema = StructType([
StructField("ID",StringType(),True, {'comment': "Unique customer id"}),
StructField("Test",StringType(),True, {'comment': "this is a test"}),
...)]
# Define Delta Live Table
@dlt.table(name="test_bronze",
comment = "Test data incrementally ingested from S3 Raw landing zone",
table_properties={
"quality": "bronze"
},
schema=schema
)
# Read Stream
def rafode_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", source_format) # format is csv
.option("inferSchema", "False")
.option("header", "True")
.schema(schema)
.load(data_source) # data_source is S3 directory
)
When attempting to run this as a Delta Live Table pipeline, I get an error that:
org.apache.spark.sql.AnalysisException: Failed to merge fields 'Test' and 'Test. Failed to merge incompatible data types IntegerType and DoubleType
I have attempted running the readStream with and without the `.option("inferSchema", "False")` to see if that allows for the pre-defined schema to be used vs an infered schema, but I run into the same error. It seems as though spark.readStream is not using the pre-defined schema on each read of the csv files in the directory which is causing schema differences and failure to load. Do I need to alter my readStream code to force the use of my schema or am I missing something else?