Summary
I am using Delta Live Tables to create a pipeline in Databricks and I am facing a problem of merging the schema of different files that are placed in the same folder in a datalake, even though I am using File Patterns to separate the data ingestion.
Details
I have two files in the same path of a storage account. Consider file_1 and file_2. They are placed in the Abfss location like:
abfss://<container>@<storage_account>.dfs.core.windows.net/path/to/folder/
The file_1 has a schema like:
- column_a
- column_b
- column_c
The file_2 has a schema like:
- column_d
- column_e
- column_f
The code that I am applying to create the Delta Live Tables is the following one:
CREATE OR REFRESH STREAMING LIVE TABLE table_number_1 (
CONSTRAINT correct_schema EXPECT (_rescued_data IS NULL)
)
PARTITIONED BY (_JOB_UPDATED_PARTITION_YEAR, _JOB_UPDATED_PARTITION_MONTH)
COMMENT "Comment for table 1"
TBLPROPERTIES (
-- Quality flag
'mypipeline.quality' = 'bronze',
-- Delta Live Tables table properties
'pipelines.autoOptimize.managed' = 'true',
'pipelines.reset.allowed' = 'true',
'pipelines.autoOptimize.zOrderCols' = '_JOB_UPDATED_PARTITION_YEAR, _JOB_UPDATED_PARTITION_MONTH',
-- delta table options
'delta.enableDeletionVectors' = 'true',
'delta.deletedFileRetentionDuration' = '30 days',
'delta.enableChangeDataFeed' = 'true',
'delta.feature.timestampNtz' = 'supported',
-- enables special characters in the table
'delta.columnMapping.mode' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion'= '5'
)
AS
SELECT
*,
_metadata.file_path as _JOB_SOURCE_FILE,
_metadata.file_name as _FILE_NAME,
CONVERT_TIMEZONE('UTC', 'America/Belem', _metadata.file_modification_time) as _MODIFICATION_TIME_BR,
CONVERT_TIMEZONE('UTC', 'America/Belem', current_timestamp()) as _JOB_UPDATED_TIME_BR,
year(_JOB_UPDATED_TIME_BR) AS _JOB_UPDATED_PARTITION_YEAR,
month(_JOB_UPDATED_TIME_BR) AS _JOB_UPDATED_PARTITION_MONTH,
'Datalake' AS _DATA_ORIGIN
FROM cloud_files(
"abfss://<container>@<storage_account>.dfs.core.windows.net/path/to/folder/",
'csv',
map(
-- Generic options
'fileNamePattern', '*file_1*',
-- Common Auto Loader options
'cloudFiles.allowOverwrites', 'true',
'cloudFiles.inferColumnTypes', 'true',
'cloudFiles.maxFilesPerTrigger', '1',
-- CSV file format options
'ignoreLeadingWhiteSpace', 'true',
'ignoreTrailingWhiteSpace', 'true',
'encoding', 'UTF-8',
'header', 'true',
'mode', 'FAILFAST',
'multiLine', 'false',
'readerCaseSensitive', 'false',
'delimiter', ',',
'skipRows', '0',
-- Escape config
'quote', '\"',
'escape', '\"',
'unescapedQuoteHandling', 'BACK_TO_DELIMITER'
)
);
In this code, I am referring to the ABFSS path using the function cloud_files() [link] but as the folder has different files I am applying a fileNamePattern like an example '*file_1*'. According to the Auto Loader documentation [link] this is a generic option to provide a pattern for choosing files in a folder.
The problem is that when I start the DLT Pipeline to create the two tables, both of them are materialized with the following schema:
- column_a
- column_b
- column_c
- column_d
- column_e
- column_f
That is, somehow the DLT (Delta Live Table) when inferring the schema is combining all the files in the folder instead of using the fileNamePattern provided by me inside the cloud_files function.
That is strange because if I use `Auto Loader` instead of `DLT` via read_files() [link] function then the tables are created in the expected way.
The main question
Does someone know how to deal with this merging schema problem?