<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Replay(backfill) DLT CDC using kafka in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56304#M30512</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/55846"&gt;@532664&lt;/a&gt;&amp;nbsp;, I posted a "kind of" related post &lt;A href="https://community.databricks.com/t5/data-engineering/cdc-and-raw-data/td-p/56080" target="_self"&gt;here&lt;/A&gt;. What I am doing is to save data from kafka as delta files, and this is my source of truth. Then I implement a medallion architecture from it, reading the raw data as cloudfiles from DLT pipeline.&lt;/P&gt;&lt;P&gt;In this case, as the raw data is correct, you just need to make a selective refresh to the bronze tables and its dependencies. I think that&amp;nbsp;&lt;SPAN&gt;selective refresh isn't supported in continuous mode, but you can stop it, update to triggered, make the selective refresh, and then change to continuous again. It will be down for a while, but you can do that in a maintenance window.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I do not know if these are best practices or not. What do you think?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Tue, 02 Jan 2024 15:20:52 GMT</pubDate>
    <dc:creator>jcozar</dc:creator>
    <dc:date>2024-01-02T15:20:52Z</dc:date>
    <item>
      <title>Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56247#M30510</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;We are receiving DB CDC binlogs through Kafka and synchronizing tables in OLAP system using the apply_changes function in Delta Live Table (DLT). A month ago, a column was added to our table, but due to a type mismatch, it's being stored incorrectly as nulls. (We manage our schema statically.)&lt;/P&gt;&lt;P&gt;In our DLT pipeline, the bronze stage holds the raw binlog data, so no changes are needed there. However, we need to adjust the types in the silver and gold stages. The issue is that in continuous mode, selective refresh isn't supported, and we're constrained to a full refresh. Given that Kafka's retention period is two weeks, a full refresh might lead to the loss of existing data.&lt;/P&gt;&lt;P&gt;What would be the best course of action in this situation?&lt;/P&gt;&lt;P&gt;Since it takes a long time to get the snapshot data from CDC binlogs and kafka, I'm thinking of storing the snapshot data in s3 and merging it into a DLT table, and then continuing the CDC with DLT. However, I don't know if it is safe to manually merge into the DLT table.&lt;BR /&gt;&lt;BR /&gt;If anyone has had a similar experience, I would appreciate your help.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jan 2024 14:08:12 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56247#M30510</guid>
      <dc:creator>532664</dc:creator>
      <dc:date>2024-01-02T14:08:12Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56304#M30512</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/55846"&gt;@532664&lt;/a&gt;&amp;nbsp;, I posted a "kind of" related post &lt;A href="https://community.databricks.com/t5/data-engineering/cdc-and-raw-data/td-p/56080" target="_self"&gt;here&lt;/A&gt;. What I am doing is to save data from kafka as delta files, and this is my source of truth. Then I implement a medallion architecture from it, reading the raw data as cloudfiles from DLT pipeline.&lt;/P&gt;&lt;P&gt;In this case, as the raw data is correct, you just need to make a selective refresh to the bronze tables and its dependencies. I think that&amp;nbsp;&lt;SPAN&gt;selective refresh isn't supported in continuous mode, but you can stop it, update to triggered, make the selective refresh, and then change to continuous again. It will be down for a while, but you can do that in a maintenance window.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I do not know if these are best practices or not. What do you think?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jan 2024 15:20:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56304#M30512</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-01-02T15:20:52Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56336#M30525</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/24595"&gt;@jcozar&lt;/a&gt;&amp;nbsp;Thanks for your opinion!&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;I agree with your suggestion and tried applying it. By switching to triggered mode, I was able to perform a selected refresh as you told.&lt;/P&gt;&lt;P&gt;After changing the struct schema value, I refreshed the silver and gold tables. Unfortunately, it didn't work as expected and resulted in an error.&lt;/P&gt;&lt;P&gt;org.apache.spark.sql.AnalysisException: Failed to merge fields 'col' and 'col'. Failed to merge incompatible data types BooleanType and IntegerType&lt;!--   notionvc: bf15db0f-f6cf-4b90-bc33-9b9089058366   --&gt;&lt;/P&gt;&lt;P&gt;Seems like I need to ponder over this more.&lt;/P&gt;&lt;P&gt;Thanks so much for your help.&lt;/P&gt;</description>
      <pubDate>Wed, 03 Jan 2024 07:37:15 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56336#M30525</guid>
      <dc:creator>532664</dc:creator>
      <dc:date>2024-01-03T07:37:15Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56338#M30526</link>
      <description>&lt;P&gt;You are right. As a single column contains all the history data, the type must be the most general. Probably string, and then you cast it in silver to boolean and to Integer as two different columns or something like that.&lt;/P&gt;&lt;P&gt;If you don't mind to tell me, which approach did you implement? A separate DLT pipeline to write from Kafka to an object store? A workflow (job run)? Or If you are using Unity Catalog, are you saving raw data in a Unity Catalog table? I am wondering about all these choices and I don't know which one would be better &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 03 Jan 2024 08:17:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56338#M30526</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-01-03T08:17:11Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56350#M30532</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/24595"&gt;@jcozar&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;&lt;SPAN&gt;I'm using Kafka directly as a source in DLT. The bronze table is set up to receive binlogs data from Kafka as follows:&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;@dlt.table
def bronze_table():
    spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka) \
        .option("subscribe", topic) \
        .load()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This data is received in a format with an 'after' section containing column and value information, like:&lt;/P&gt;&lt;P&gt;(This format from debezium,&amp;nbsp;&lt;A href="https://debezium.io/documentation/reference/stable/connectors/mysql.html" target="_blank" rel="noopener"&gt;https://debezium.io/documentation/reference/stable/connectors/mysql.html&lt;/A&gt;)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;{ "before": null, "after": { "col1": "hello", "col2": "world", ... "col10": 0 }, ... }&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;To create silver table from the 'after' JSON in bronze table, I've used a schema with the &lt;/SPAN&gt;from_json&lt;SPAN&gt; function. Initially, the schema had issues, leading to incorrect data types and null values. (col10 in example) I've changed the schema in &lt;/SPAN&gt;from_json&lt;SPAN&gt; and performed a selected refresh.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# wrong schema
schema = StructType([ \
    StructField('col1', StringType(), True), \
    StructField('col2', StringType(), True), \
    ....
    StructField('col10', BooleanType(), True)
 ])

# edited schema 
schema = StructType([ \
    StructField('col1', StringType(), True), \
    StructField('col2', StringType(), True), \
    ....
    StructField('col10', IntegerType(), True)
 ])


@dlt.table
def silver_table():
    dlt.read_stream("bronze_table").withColumn("after", from_json(col("after"), schema))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Although we're not using Unity Catalog tables, our bronze table still retains all the information.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Let me know if you need more specific information! &lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 03 Jan 2024 11:23:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56350#M30532</guid>
      <dc:creator>532664</dc:creator>
      <dc:date>2024-01-03T11:23:37Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56360#M30533</link>
      <description>&lt;P&gt;Thank you&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/55846"&gt;@532664&lt;/a&gt;&amp;nbsp;for your detailed response! That's seems to me a very good solution, and it also helps me with my doubts &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 03 Jan 2024 16:51:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56360#M30533</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-01-03T16:51:47Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56395#M30541</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/24595"&gt;@jcozar&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I'm glad I could help, but unfortunately, the issue wasn't resolved with that method.&lt;/P&gt;&lt;P&gt;As I mentioned, I encountered an error after making the change:&lt;/P&gt;&lt;P&gt;org.apache.spark.sql.AnalysisException: Failed to merge fields 'col10' and 'col10'. Failed to merge incompatible data types BooleanType and IntegerType.&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;It looks like I'll need to start from scratch and re-import all the data from the database using Kafka and a full refresh in DLT (which creates new metadata initially).&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;Thank you once again for all your help and insights. Have a great day!&lt;/P&gt;</description>
      <pubDate>Thu, 04 Jan 2024 01:59:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56395#M30541</guid>
      <dc:creator>532664</dc:creator>
      <dc:date>2024-01-04T01:59:02Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56411#M30547</link>
      <description>&lt;P&gt;I think you might solve the problem. As far as I understood, your bronze data is correct, as data is in json format (string or map/record type?). Then, the problem is in silver when you apply the static schema, because col10 is a mixture of&amp;nbsp;&lt;FONT color="#333333"&gt;BooleanType and IntegerType.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT color="#333333"&gt;Can you try to set StringType for col10 in the static schema, and then create a new column "col10_processed" which cast to Integer or something like that?&lt;/FONT&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 04 Jan 2024 08:37:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56411#M30547</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-01-04T08:37:13Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56466#M30564</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/24595"&gt;@jcozar&lt;/a&gt;&amp;nbsp;You've understood the situation accurately.&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;The bronze data is correct.&lt;/LI&gt;&lt;LI&gt;The problem arises in the silver layer when applying the static schema because the value type of col10 is an integer, but the schema is boolean. So, changing the schema from boolean to integer seemed logical.&lt;/LI&gt;&lt;LI&gt;However, due to DLT table's metadata discrepancies, errors occurred.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;#8;Am I corrected in understanding that your opinion is "It seems that setting the schema to integer didn't work, but changing it to string might."?&amp;nbsp;&lt;/P&gt;&lt;P&gt;Is this possible because strings are a more versatile type?&lt;/P&gt;&lt;P&gt;If I understand wrong, let me know.&lt;/P&gt;&lt;P&gt;I'm really appreciative of your response. Thank a lot!&lt;/P&gt;</description>
      <pubDate>Fri, 05 Jan 2024 09:03:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56466#M30564</guid>
      <dc:creator>532664</dc:creator>
      <dc:date>2024-01-05T09:03:18Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56467#M30565</link>
      <description>&lt;P&gt;That's it. I am not totally sure, because if type 1 was boolean and type 2 is integer, the second is more general and should work too. If bronze data does not define types for col10 (which I assume not, because there are two different types for the same column), you should be able to specify a single type for col10 so it works (you will need to make a refresh of silver table of course).&lt;/P&gt;&lt;P&gt;Let me know it it works! I hope so &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 05 Jan 2024 09:14:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56467#M30565</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-01-05T09:14:49Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56872#M30671</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/24595"&gt;@jcozar&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Sorry for getting back to you late.&lt;/P&gt;&lt;P&gt;I tried using StringType for col10, but unfortunately, it didn't work due to an error:&lt;/P&gt;&lt;P&gt;"Failed to merge incompatible data types BooleanType and StringType."&lt;/P&gt;&lt;P&gt;However, I've found that by setting the pipelines.reset.allowed option to false in the DLT table, I'm able to process the downstream (silver) table without data loss.&lt;/P&gt;&lt;P&gt;Additionally, applying this setting to the bronze table allows for a full refresh without any data loss.&lt;/P&gt;&lt;P&gt;If you're interested in this feature, I highly recommend exploring it further.&lt;/P&gt;&lt;P&gt;Thanks a lot!&lt;/P&gt;</description>
      <pubDate>Wed, 10 Jan 2024 15:20:27 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56872#M30671</guid>
      <dc:creator>532664</dc:creator>
      <dc:date>2024-01-10T15:20:27Z</dc:date>
    </item>
    <item>
      <title>Re: Replay(backfill) DLT CDC using kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56881#M30673</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/55846"&gt;@532664&lt;/a&gt;&amp;nbsp;thank you for your response! I will check the&amp;nbsp;&lt;SPAN&gt;pipelines.reset.allowed option, that is interesting! &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 10 Jan 2024 18:39:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/replay-backfill-dlt-cdc-using-kafka/m-p/56881#M30673</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-01-10T18:39:11Z</dc:date>
    </item>
  </channel>
</rss>

