โ01-02-2024 06:08 AM
Hello,
We are receiving DB CDC binlogs through Kafka and synchronizing tables in OLAP system using the apply_changes function in Delta Live Table (DLT). A month ago, a column was added to our table, but due to a type mismatch, it's being stored incorrectly as nulls. (We manage our schema statically.)
In our DLT pipeline, the bronze stage holds the raw binlog data, so no changes are needed there. However, we need to adjust the types in the silver and gold stages. The issue is that in continuous mode, selective refresh isn't supported, and we're constrained to a full refresh. Given that Kafka's retention period is two weeks, a full refresh might lead to the loss of existing data.
What would be the best course of action in this situation?
Since it takes a long time to get the snapshot data from CDC binlogs and kafka, I'm thinking of storing the snapshot data in s3 and merging it into a DLT table, and then continuing the CDC with DLT. However, I don't know if it is safe to manually merge into the DLT table.
If anyone has had a similar experience, I would appreciate your help.
โ01-10-2024 10:39 AM
Hi @532664 thank you for your response! I will check the pipelines.reset.allowed option, that is interesting! ๐
โ01-02-2024 07:20 AM
Hi @532664 , I posted a "kind of" related post here. What I am doing is to save data from kafka as delta files, and this is my source of truth. Then I implement a medallion architecture from it, reading the raw data as cloudfiles from DLT pipeline.
In this case, as the raw data is correct, you just need to make a selective refresh to the bronze tables and its dependencies. I think that selective refresh isn't supported in continuous mode, but you can stop it, update to triggered, make the selective refresh, and then change to continuous again. It will be down for a while, but you can do that in a maintenance window.
I do not know if these are best practices or not. What do you think?
โ01-02-2024 11:37 PM
Hi @jcozar Thanks for your opinion!
I agree with your suggestion and tried applying it. By switching to triggered mode, I was able to perform a selected refresh as you told.
After changing the struct schema value, I refreshed the silver and gold tables. Unfortunately, it didn't work as expected and resulted in an error.
org.apache.spark.sql.AnalysisException: Failed to merge fields 'col' and 'col'. Failed to merge incompatible data types BooleanType and IntegerType
Seems like I need to ponder over this more.
Thanks so much for your help.
โ01-03-2024 12:17 AM
You are right. As a single column contains all the history data, the type must be the most general. Probably string, and then you cast it in silver to boolean and to Integer as two different columns or something like that.
If you don't mind to tell me, which approach did you implement? A separate DLT pipeline to write from Kafka to an object store? A workflow (job run)? Or If you are using Unity Catalog, are you saving raw data in a Unity Catalog table? I am wondering about all these choices and I don't know which one would be better ๐
โ01-03-2024 03:22 AM - edited โ01-03-2024 03:23 AM
@jcozar
I'm using Kafka directly as a source in DLT. The bronze table is set up to receive binlogs data from Kafka as follows:
@dlt.table
def bronze_table():
spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka) \
.option("subscribe", topic) \
.load()
This data is received in a format with an 'after' section containing column and value information, like:
(This format from debezium, https://debezium.io/documentation/reference/stable/connectors/mysql.html)
{ "before": null, "after": { "col1": "hello", "col2": "world", ... "col10": 0 }, ... }
To create silver table from the 'after' JSON in bronze table, I've used a schema with the from_json function. Initially, the schema had issues, leading to incorrect data types and null values. (col10 in example) I've changed the schema in from_json and performed a selected refresh.
# wrong schema
schema = StructType([ \
StructField('col1', StringType(), True), \
StructField('col2', StringType(), True), \
....
StructField('col10', BooleanType(), True)
])
# edited schema
schema = StructType([ \
StructField('col1', StringType(), True), \
StructField('col2', StringType(), True), \
....
StructField('col10', IntegerType(), True)
])
@dlt.table
def silver_table():
dlt.read_stream("bronze_table").withColumn("after", from_json(col("after"), schema))
Although we're not using Unity Catalog tables, our bronze table still retains all the information.
Let me know if you need more specific information!
โ01-03-2024 08:51 AM
Thank you @532664 for your detailed response! That's seems to me a very good solution, and it also helps me with my doubts ๐
โ01-03-2024 05:59 PM
I'm glad I could help, but unfortunately, the issue wasn't resolved with that method.
As I mentioned, I encountered an error after making the change:
org.apache.spark.sql.AnalysisException: Failed to merge fields 'col10' and 'col10'. Failed to merge incompatible data types BooleanType and IntegerType.
It looks like I'll need to start from scratch and re-import all the data from the database using Kafka and a full refresh in DLT (which creates new metadata initially).
Thank you once again for all your help and insights. Have a great day!
โ01-04-2024 12:37 AM
I think you might solve the problem. As far as I understood, your bronze data is correct, as data is in json format (string or map/record type?). Then, the problem is in silver when you apply the static schema, because col10 is a mixture of BooleanType and IntegerType.
Can you try to set StringType for col10 in the static schema, and then create a new column "col10_processed" which cast to Integer or something like that?
โ01-05-2024 01:03 AM
@jcozar You've understood the situation accurately.
Am I corrected in understanding that your opinion is "It seems that setting the schema to integer didn't work, but changing it to string might."?
Is this possible because strings are a more versatile type?
If I understand wrong, let me know.
I'm really appreciative of your response. Thank a lot!
โ01-05-2024 01:14 AM
That's it. I am not totally sure, because if type 1 was boolean and type 2 is integer, the second is more general and should work too. If bronze data does not define types for col10 (which I assume not, because there are two different types for the same column), you should be able to specify a single type for col10 so it works (you will need to make a refresh of silver table of course).
Let me know it it works! I hope so ๐
โ01-10-2024 07:20 AM
Sorry for getting back to you late.
I tried using StringType for col10, but unfortunately, it didn't work due to an error:
"Failed to merge incompatible data types BooleanType and StringType."
However, I've found that by setting the pipelines.reset.allowed option to false in the DLT table, I'm able to process the downstream (silver) table without data loss.
Additionally, applying this setting to the bronze table allows for a full refresh without any data loss.
If you're interested in this feature, I highly recommend exploring it further.
Thanks a lot!
โ01-10-2024 10:39 AM
Hi @532664 thank you for your response! I will check the pipelines.reset.allowed option, that is interesting! ๐
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group