cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling Changing Schema in CDC DLT

pmt
New Contributor III

We are building a DLT pipeline and the autoloader is handling schema evolution fine. However, further down the pipeline we are trying to load that streamed data with the apply_changes() function into a new table and, from the looks of it, doesn't seem to handle row updates with a new schema. However, during "Setting Up Tables" it fails with an "org.apache.spark.sql.catalyst.parser.ParseException" error. The only explanation I can think of is it doesn't like replacing a column field of type "Null" with "Struct".

Here is the code:

    @dlt.view(
      name = "authenticators_stream"
    )
    @dlt.expect_all_or_drop({"valid_doc": "doc IS NOT NULL"})
    def stream_table():
        return (
            spark.readStream \
                .format("cloudFiles") \
                .option("cloudFiles.useNotifications", "true") \
                .option("cloudFiles.queueUrl", "https://sqs.us-east-1.amazonaws.com/********/mongo-data-queue-testing") \
                .option("cloudFiles.includeExistingFiles", "true") \
                .option("cloudFiles.format", "json") \
                .option("cloudFiles.inferColumnTypes", "true")
                .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
                .option("multiline","false") \
                .option("cloudFiles.schemaHints", "_id STRING, ot STRING, ts TIMESTAMP, year INT, month INT, day INT")
                .load(json_path))
 
    dlt.create_streaming_live_table(
        name = "authenticators_raw",
        spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}
    )
 
    dlt.apply_changes(
      target = "authenticators_raw",
      source = "authenticators_stream",
      keys = ["_id"],
      sequence_by = F.col("ts"),
      stored_as_scd_type = 2
    )

And here is the full error message:

org.apache.spark.sql.catalyst.parser.ParseException:

[PARSE_SYNTAX_ERROR] Syntax error at or near '<'(line 1, pos 6)

== SQL ==

struct<__v:bigint,_id:string,buttonlabel:string,company:string,configuration:struct<parameters:struct<company-id:string,cyberarkurl:string,duo-sso-url:string,email:string,google-oauth-url:string,login-success-text:string,login-url:string,microsofturl:string,okta-url:string,oktasubdomain:string,onelogin-url:string,password:string,payroll-cookies-wait-for-url:string,payroll-provider-selector:string,ping-identity-url:string,request-id:string,secureid-url:string,subdomain:string,target-computing-resources-url:string,username:string,usersname:string,wait-for-milliseconds-param-key:string,wait-for-xpath-after-navigate:string,workday-organization-group-name:string>>,connector:string,createdat:string,optional:boolean,updatedat:string>

------^^^

7 REPLIES 7

Hubert-Dudek
Esteemed Contributor III

If you leave only the authenticators_stream table, is the code running ok?

pmt
New Contributor III

Yes, upon further inspection in the apply_changes() call sql is not escaping column names with "-". When we replaced that character it worked. Feels like a databricks bug

Hubert-Dudek
Esteemed Contributor III

ahhh, it is hive metastore limitation. It will be solved with migration to the unity catalog soon.

pmt
New Contributor III

really? that is great news. Do you know if it will also help with the auto-loader schema evolution? Our current pipeline runtime is ridiculously long because the cluster is forced to restart every schema change detected.

Hubert-Dudek
Esteemed Contributor III

Yes, it should solve that issue. It was mentioned at the last Data+AI conference a month ago

pmt
New Contributor III

that would be great. I'm going to look for that video

Vidula
Honored Contributor

Hey there @Palani Thangaraj​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group