โ11-01-2023 07:06 AM
Hi,
I'm implementing a DLT pipeline using Auto Loader to ingest json files. The json files contains an array called Items that contains records and two of the fields in the records wasn't part of the original schema, but has been added later. Auto Loader has updated the schema and when I query the bronze table in Databricks SQL, I can select the two added fields, and they are NULL in the data that was loaded before they existed in the json files and contains values in more recent data.
However, in the notebook that creates the silver table, I get an error when running the DLT pipeline with a message saying "Unable to process statement for View 'VehicleInspectionProtocol_Flattened_Temp'. [FIELD_NOT_FOUND] No such struct field `RegistrationDate` in ...". RegistrationDate was one of the fields not part of the records initially. Can I implement the notebook in a better way?
CREATE TEMPORARY STREAMING LIVE VIEW VehicleInspectionProtocol_Explode_Temp
AS
SELECT
explode(Items) as Item,
AuditSourceFile,
AuditLoadTime
FROM STREAM(LIVE.vehicleInspectionProtocol_Raw);
CREATE TEMPORARY STREAMING LIVE VIEW VehicleInspectionProtocol_Flattened_Temp
AS
SELECT
Item.Id,
Item.RegistrationDate,
Item.InspectionDateTime,
Item.VehicleInspectionType,
AuditSourceFile,
AuditLoadTime
FROM STREAM(LIVE.VehicleInspectionProtocol_Explode_Temp);
CREATE OR REFRESH STREAMING LIVE TABLE VehicleInspectionProtocol;
APPLY CHANGES INTO
LIVE.VehicleInspectionProtocol
FROM
STREAM(LIVE.VehicleInspectionProtocol_Flattened_Temp)
KEYS
(Id)
SEQUENCE BY
AuditSourceFile;
Thanks,
Magnus
โ11-27-2023 04:11 AM
Hi @Magnus , It seems youโre encountering an issue with schema evolution in your DLT pipeline using Auto Loader.
Letโs explore how you can improve your notebook implementation.
Schema Inference and Evolution:
Auto Loader can automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced.
When first reading data, Auto Loader samples the first 50 GB or 1000 files (whichever limit is crossed first) to infer the schema. It stores the schema information in a directory called _schemas at the configured cloudFiles.schemaLocation.
For formats that donโt encode data types (such as JSON and CSV), Auto Loader infers all columns as strings. For formats with typed schema (like Parquet and Avro), it samples a subset of files and merges the s....
To configure schema inference and evolution, specify a target directory for the option cloudFiles.schemaLocation. You can use the same directory you specify for the checkpointLocation. If youโre using Delta Live Tables, Databricks manages schema location and other checkpoint information automatically.
Error Message:
Suggestions:
Explicit Schema Definition:
Instead of relying solely on schema inference, explicitly define the schema for your flattened view (VehicleInspectionProtocol_Flattened_Temp). This way, you can ensure that all expected fields are included.
Define the schema using StructType and specify the data types for each field.
Check Data Consistency:
Review and Debug:
Remember that schema evolution can be complex, especially when dealing with semi-structured data. Explicitly defining the schema and verifying data consistency will help you avoid unexpected errors during processing.
Good luck with your DLT pipeline! ๐
โ11-27-2023 04:11 AM
Hi @Magnus , It seems youโre encountering an issue with schema evolution in your DLT pipeline using Auto Loader.
Letโs explore how you can improve your notebook implementation.
Schema Inference and Evolution:
Auto Loader can automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced.
When first reading data, Auto Loader samples the first 50 GB or 1000 files (whichever limit is crossed first) to infer the schema. It stores the schema information in a directory called _schemas at the configured cloudFiles.schemaLocation.
For formats that donโt encode data types (such as JSON and CSV), Auto Loader infers all columns as strings. For formats with typed schema (like Parquet and Avro), it samples a subset of files and merges the s....
To configure schema inference and evolution, specify a target directory for the option cloudFiles.schemaLocation. You can use the same directory you specify for the checkpointLocation. If youโre using Delta Live Tables, Databricks manages schema location and other checkpoint information automatically.
Error Message:
Suggestions:
Explicit Schema Definition:
Instead of relying solely on schema inference, explicitly define the schema for your flattened view (VehicleInspectionProtocol_Flattened_Temp). This way, you can ensure that all expected fields are included.
Define the schema using StructType and specify the data types for each field.
Check Data Consistency:
Review and Debug:
Remember that schema evolution can be complex, especially when dealing with semi-structured data. Explicitly defining the schema and verifying data consistency will help you avoid unexpected errors during processing.
Good luck with your DLT pipeline! ๐
โ11-28-2023 12:19 AM
I used a schema hint in the Auto Loader notebook to explicitly define the two fields that caused the error. Then it worked. Thanks @Kaniz_Fatma !
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group