07-28-2022 11:44 AM
We are building a DLT pipeline and the autoloader is handling schema evolution fine. However, further down the pipeline we are trying to load that streamed data with the apply_changes() function into a new table and, from the looks of it, doesn't seem to handle row updates with a new schema. However, during "Setting Up Tables" it fails with an "org.apache.spark.sql.catalyst.parser.ParseException" error. The only explanation I can think of is it doesn't like replacing a column field of type "Null" with "Struct".
Here is the code:
@dlt.view(
name = "authenticators_stream"
)
@dlt.expect_all_or_drop({"valid_doc": "doc IS NOT NULL"})
def stream_table():
return (
spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.useNotifications", "true") \
.option("cloudFiles.queueUrl", "https://sqs.us-east-1.amazonaws.com/********/mongo-data-queue-testing") \
.option("cloudFiles.includeExistingFiles", "true") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("multiline","false") \
.option("cloudFiles.schemaHints", "_id STRING, ot STRING, ts TIMESTAMP, year INT, month INT, day INT")
.load(json_path))
dlt.create_streaming_live_table(
name = "authenticators_raw",
spark_conf = {"spark.databricks.delta.schema.autoMerge.enabled": "true"}
)
dlt.apply_changes(
target = "authenticators_raw",
source = "authenticators_stream",
keys = ["_id"],
sequence_by = F.col("ts"),
stored_as_scd_type = 2
)
And here is the full error message:
org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near '<'(line 1, pos 6)
== SQL ==
struct<__v:bigint,_id:string,buttonlabel:string,company:string,configuration:struct<parameters:struct<company-id:string,cyberarkurl:string,duo-sso-url:string,email:string,google-oauth-url:string,login-success-text:string,login-url:string,microsofturl:string,okta-url:string,oktasubdomain:string,onelogin-url:string,password:string,payroll-cookies-wait-for-url:string,payroll-provider-selector:string,ping-identity-url:string,request-id:string,secureid-url:string,subdomain:string,target-computing-resources-url:string,username:string,usersname:string,wait-for-milliseconds-param-key:string,wait-for-xpath-after-navigate:string,workday-organization-group-name:string>>,connector:string,createdat:string,optional:boolean,updatedat:string>
------^^^
07-29-2022 02:54 AM
If you leave only the authenticators_stream table, is the code running ok?
07-29-2022 09:00 AM
Yes, upon further inspection in the apply_changes() call sql is not escaping column names with "-". When we replaced that character it worked. Feels like a databricks bug
07-29-2022 11:15 AM
ahhh, it is hive metastore limitation. It will be solved with migration to the unity catalog soon.
07-29-2022 11:23 AM
really? that is great news. Do you know if it will also help with the auto-loader schema evolution? Our current pipeline runtime is ridiculously long because the cluster is forced to restart every schema change detected.
07-29-2022 11:26 AM
Yes, it should solve that issue. It was mentioned at the last Data+AI conference a month ago
07-29-2022 11:46 AM
that would be great. I'm going to look for that video
09-06-2022 04:48 AM
Hey there @Palani Thangaraj
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group