cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Replay(backfill) DLT CDC using kafka

532664
New Contributor III

Hello,

We are receiving DB CDC binlogs through Kafka and synchronizing tables in OLAP system using the apply_changes function in Delta Live Table (DLT). A month ago, a column was added to our table, but due to a type mismatch, it's being stored incorrectly as nulls. (We manage our schema statically.)

In our DLT pipeline, the bronze stage holds the raw binlog data, so no changes are needed there. However, we need to adjust the types in the silver and gold stages. The issue is that in continuous mode, selective refresh isn't supported, and we're constrained to a full refresh. Given that Kafka's retention period is two weeks, a full refresh might lead to the loss of existing data.

What would be the best course of action in this situation?

Since it takes a long time to get the snapshot data from CDC binlogs and kafka, I'm thinking of storing the snapshot data in s3 and merging it into a DLT table, and then continuing the CDC with DLT. However, I don't know if it is safe to manually merge into the DLT table.

If anyone has had a similar experience, I would appreciate your help.

 

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @532664 thank you for your response! I will check the pipelines.reset.allowed option, that is interesting! ๐Ÿ™‚

View solution in original post

11 REPLIES 11

jcozar
Contributor

Hi @532664 , I posted a "kind of" related post here. What I am doing is to save data from kafka as delta files, and this is my source of truth. Then I implement a medallion architecture from it, reading the raw data as cloudfiles from DLT pipeline.

In this case, as the raw data is correct, you just need to make a selective refresh to the bronze tables and its dependencies. I think that selective refresh isn't supported in continuous mode, but you can stop it, update to triggered, make the selective refresh, and then change to continuous again. It will be down for a while, but you can do that in a maintenance window.

I do not know if these are best practices or not. What do you think?

 

532664
New Contributor III

Hi @jcozar Thanks for your opinion!


I agree with your suggestion and tried applying it. By switching to triggered mode, I was able to perform a selected refresh as you told.

After changing the struct schema value, I refreshed the silver and gold tables. Unfortunately, it didn't work as expected and resulted in an error.

org.apache.spark.sql.AnalysisException: Failed to merge fields 'col' and 'col'. Failed to merge incompatible data types BooleanType and IntegerType

Seems like I need to ponder over this more.

Thanks so much for your help.

You are right. As a single column contains all the history data, the type must be the most general. Probably string, and then you cast it in silver to boolean and to Integer as two different columns or something like that.

If you don't mind to tell me, which approach did you implement? A separate DLT pipeline to write from Kafka to an object store? A workflow (job run)? Or If you are using Unity Catalog, are you saving raw data in a Unity Catalog table? I am wondering about all these choices and I don't know which one would be better ๐Ÿ™‚

532664
New Contributor III

@jcozar 
I'm using Kafka directly as a source in DLT. The bronze table is set up to receive binlogs data from Kafka as follows:

 

@dlt.table
def bronze_table():
    spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka) \
        .option("subscribe", topic) \
        .load()

 

This data is received in a format with an 'after' section containing column and value information, like:

(This format from debezium, https://debezium.io/documentation/reference/stable/connectors/mysql.html)

 

{ "before": null, "after": { "col1": "hello", "col2": "world", ... "col10": 0 }, ... }

 

To create silver table from the 'after' JSON in bronze table, I've used a schema with the from_json function. Initially, the schema had issues, leading to incorrect data types and null values. (col10 in example) I've changed the schema in from_json and performed a selected refresh.

 

# wrong schema
schema = StructType([ \
    StructField('col1', StringType(), True), \
    StructField('col2', StringType(), True), \
    ....
    StructField('col10', BooleanType(), True)
 ])

# edited schema 
schema = StructType([ \
    StructField('col1', StringType(), True), \
    StructField('col2', StringType(), True), \
    ....
    StructField('col10', IntegerType(), True)
 ])


@dlt.table
def silver_table():
    dlt.read_stream("bronze_table").withColumn("after", from_json(col("after"), schema))

 

Although we're not using Unity Catalog tables, our bronze table still retains all the information.

Let me know if you need more specific information!

jcozar
Contributor

Thank you @532664 for your detailed response! That's seems to me a very good solution, and it also helps me with my doubts ๐Ÿ™‚

532664
New Contributor III

@jcozar 

I'm glad I could help, but unfortunately, the issue wasn't resolved with that method.

As I mentioned, I encountered an error after making the change:

org.apache.spark.sql.AnalysisException: Failed to merge fields 'col10' and 'col10'. Failed to merge incompatible data types BooleanType and IntegerType.

It looks like I'll need to start from scratch and re-import all the data from the database using Kafka and a full refresh in DLT (which creates new metadata initially).

Thank you once again for all your help and insights. Have a great day!

I think you might solve the problem. As far as I understood, your bronze data is correct, as data is in json format (string or map/record type?). Then, the problem is in silver when you apply the static schema, because col10 is a mixture of BooleanType and IntegerType.

Can you try to set StringType for col10 in the static schema, and then create a new column "col10_processed" which cast to Integer or something like that?

532664
New Contributor III

@jcozar You've understood the situation accurately.

  • The bronze data is correct.
  • The problem arises in the silver layer when applying the static schema because the value type of col10 is an integer, but the schema is boolean. So, changing the schema from boolean to integer seemed logical.
  • However, due to DLT table's metadata discrepancies, errors occurred.

Am I corrected in understanding that your opinion is "It seems that setting the schema to integer didn't work, but changing it to string might."? 

Is this possible because strings are a more versatile type?

If I understand wrong, let me know.

I'm really appreciative of your response. Thank a lot!

That's it. I am not totally sure, because if type 1 was boolean and type 2 is integer, the second is more general and should work too. If bronze data does not define types for col10 (which I assume not, because there are two different types for the same column), you should be able to specify a single type for col10 so it works (you will need to make a refresh of silver table of course).

Let me know it it works! I hope so ๐Ÿ™‚

532664
New Contributor III

@jcozar 

Sorry for getting back to you late.

I tried using StringType for col10, but unfortunately, it didn't work due to an error:

"Failed to merge incompatible data types BooleanType and StringType."

However, I've found that by setting the pipelines.reset.allowed option to false in the DLT table, I'm able to process the downstream (silver) table without data loss.

Additionally, applying this setting to the bronze table allows for a full refresh without any data loss.

If you're interested in this feature, I highly recommend exploring it further.

Thanks a lot!

Hi @532664 thank you for your response! I will check the pipelines.reset.allowed option, that is interesting! ๐Ÿ™‚

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!