cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling Changing Schema in CDC DLT

pmt
New Contributor III

We are building a DLT pipeline and the autoloader is handling schema evolution fine. However, further down the pipeline we are trying to load that streamed data with the apply_changes() function into a new table and, from the looks of it, doesn't seem to handle row updates with a new schema. However, during "Setting Up Tables" it fails with an "org.apache.spark.sql.catalyst.parser.ParseException" error. The only explanation I can think of is it doesn't like replacing a column field of type "Null" with "Struct".

Here is the code:

    @dlt.view(
      name = "authenticators_stream"
    )
    @dlt.expect_all_or_drop({"valid_doc": "doc IS NOT NULL"})
    def stream_table():
        return (
            spark.readStream \
                .format("cloudFiles") \
                .option("cloudFiles.useNotifications", "true") \
                .option("cloudFiles.queueUrl", "https://sqs.us-east-1.amazonaws.com/********/mongo-data-queue-testing") \
                .option("cloudFiles.includeExistingFiles", "true") \
                .option("cloudFiles.format", "json") \
                .option("cloudFiles.inferColumnTypes", "true")
                .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
                .option("multiline","false") \
                .option("cloudFiles.schemaHints", "_id STRING, ot STRING, ts TIMESTAMP, year INT, month INT, day INT")
                .load(json_path))
 
    dlt.create_streaming_live_table(
        name = "authenticators_raw",
        spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}
    )
 
    dlt.apply_changes(
      target = "authenticators_raw",
      source = "authenticators_stream",
      keys = ["_id"],
      sequence_by = F.col("ts"),
      stored_as_scd_type = 2
    )

And here is the full error message:

org.apache.spark.sql.catalyst.parser.ParseException:

[PARSE_SYNTAX_ERROR] Syntax error at or near '<'(line 1, pos 6)

== SQL ==

struct<__v:bigint,_id:string,buttonlabel:string,company:string,configuration:struct<parameters:struct<company-id:string,cyberarkurl:string,duo-sso-url:string,email:string,google-oauth-url:string,login-success-text:string,login-url:string,microsofturl:string,okta-url:string,oktasubdomain:string,onelogin-url:string,password:string,payroll-cookies-wait-for-url:string,payroll-provider-selector:string,ping-identity-url:string,request-id:string,secureid-url:string,subdomain:string,target-computing-resources-url:string,username:string,usersname:string,wait-for-milliseconds-param-key:string,wait-for-xpath-after-navigate:string,workday-organization-group-name:string>>,connector:string,createdat:string,optional:boolean,updatedat:string>

------^^^

7 REPLIES 7

Hubert-Dudek
Esteemed Contributor III

If you leave only the authenticators_stream table, is the code running ok?

pmt
New Contributor III

Yes, upon further inspection in the apply_changes() call sql is not escaping column names with "-". When we replaced that character it worked. Feels like a databricks bug

Hubert-Dudek
Esteemed Contributor III

ahhh, it is hive metastore limitation. It will be solved with migration to the unity catalog soon.

pmt
New Contributor III

really? that is great news. Do you know if it will also help with the auto-loader schema evolution? Our current pipeline runtime is ridiculously long because the cluster is forced to restart every schema change detected.

Hubert-Dudek
Esteemed Contributor III

Yes, it should solve that issue. It was mentioned at the last Data+AI conference a month ago

pmt
New Contributor III

that would be great. I'm going to look for that video

Vidula
Honored Contributor

Hey there @Palani Thangaraj​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.