Handling Changing Schema in CDC DLT
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-28-2022 11:44 AM
We are building a DLT pipeline and the autoloader is handling schema evolution fine. However, further down the pipeline we are trying to load that streamed data with the apply_changes() function into a new table and, from the looks of it, doesn't seem to handle row updates with a new schema. However, during "Setting Up Tables" it fails with an "org.apache.spark.sql.catalyst.parser.ParseException" error. The only explanation I can think of is it doesn't like replacing a column field of type "Null" with "Struct".
Here is the code:
@dlt.view(
name = "authenticators_stream"
)
@dlt.expect_all_or_drop({"valid_doc": "doc IS NOT NULL"})
def stream_table():
return (
spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.useNotifications", "true") \
.option("cloudFiles.queueUrl", "https://sqs.us-east-1.amazonaws.com/********/mongo-data-queue-testing") \
.option("cloudFiles.includeExistingFiles", "true") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("multiline","false") \
.option("cloudFiles.schemaHints", "_id STRING, ot STRING, ts TIMESTAMP, year INT, month INT, day INT")
.load(json_path))
dlt.create_streaming_live_table(
name = "authenticators_raw",
spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}
)
dlt.apply_changes(
target = "authenticators_raw",
source = "authenticators_stream",
keys = ["_id"],
sequence_by = F.col("ts"),
stored_as_scd_type = 2
)And here is the full error message:
org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near '<'(line 1, pos 6)
== SQL ==
struct<__v:bigint,_id:string,buttonlabel:string,company:string,configuration:struct<parameters:struct<company-id:string,cyberarkurl:string,duo-sso-url:string,email:string,google-oauth-url:string,login-success-text:string,login-url:string,microsofturl:string,okta-url:string,oktasubdomain:string,onelogin-url:string,password:string,payroll-cookies-wait-for-url:string,payroll-provider-selector:string,ping-identity-url:string,request-id:string,secureid-url:string,subdomain:string,target-computing-resources-url:string,username:string,usersname:string,wait-for-milliseconds-param-key:string,wait-for-xpath-after-navigate:string,workday-organization-group-name:string>>,connector:string,createdat:string,optional:boolean,updatedat:string>
------^^^
- Labels:
-
Autoloader
-
Delta Live Tables
-
DLT