<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Avoiding full refresh in CDF data on DLT updates in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/avoiding-full-refresh-in-cdf-data-on-dlt-updates/m-p/102257#M41037</link>
    <description>&lt;P&gt;Thank you, &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/106294"&gt;@Alberto_Umana&lt;/a&gt;,&lt;/P&gt;&lt;P&gt;One of the main reasons we chose to use DLT was its ease of setup and monitoring, as well as the ability to avoid creating custom Spark Streaming jobs. From what I understand, your suggestion is to use both DLT and Spark Streaming jobs, which could potentially increase computing costs.&lt;/P&gt;&lt;P&gt;Does this mean DLT is not an appropriate tool for such a simple use case, where we are counting streaming events per group?&lt;/P&gt;</description>
    <pubDate>Mon, 16 Dec 2024 13:28:14 GMT</pubDate>
    <dc:creator>buraksivrikaya</dc:creator>
    <dc:date>2024-12-16T13:28:14Z</dc:date>
    <item>
      <title>Avoiding full refresh in CDF data on DLT updates</title>
      <link>https://community.databricks.com/t5/data-engineering/avoiding-full-refresh-in-cdf-data-on-dlt-updates/m-p/102248#M41033</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;We have a DLT pipeline with the following source code:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import dlt
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import LongType, StringType, StructField, StructType

KAFKA_TOPIC = "..."
BOOTSTRAP_SERVERS = "..."

# Define the schema of the JSON messages
message_context_schema = StructType(
[
StructField("lectureId", LongType()),
StructField("sectionId", LongType()),
StructField("courseId", LongType()),
]
)

message_schema = StructType(
[
StructField("chatMessage", StringType()),
StructField("messageContext", message_context_schema),
StructField("visitorUuid", StringType()),
]
)


@dlt.table(name="dlt_example_kafka_raw", comment="Raw data ingested from Kafka topic.")
def kafka_raw():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.type", "PEM")
.option(
"kafka.ssl.truststore.location",
"/dbfs/FileStore/udemy_ca_issuing_chain_udsrvs_net.pem",
)
.load()
)


@dlt.table(
name="dlt_example_kafka_parsed",
comment="Parsed Kafka data with JSON fields extracted.",
)
def kafka_parsed():
return (
dlt.read_stream("dlt_example_kafka_raw")
.selectExpr("CAST(value AS STRING) as json_value")
.select(from_json(col("json_value"), message_schema).alias("data"))
.select(
"data.chatMessage",
"data.messageContext",
col("data.visitorUuid").alias("visitor_id"),
)
)


@dlt.table(
name="dlt_example_visitor_message_counts",
comment="Counts of messages per visitor id.",
table_properties={"pipelines.reset.allowed": "false"},
)
def visitor_message_counts():
return dlt.read_stream("dlt_example_kafka_parsed").groupBy("visitor_id").count()&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;We are capturing the change data feed from the final DLT table (dlt_example_visitor_message_counts) to be processed by other workflows. The issue we’re facing is that whenever a single Kafka event occurs, the entire data in the final table is refreshed, whereas we only want a single row to be upserted.&lt;BR /&gt;&lt;BR /&gt;Let me clarify it with an example:&lt;/SPAN&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;SPAN&gt;dlt_example_visitor_message_counts&lt;/SPAN&gt;&amp;nbsp;is supposed to store the chat counts per visitor id. There can be multiple rows each corresponding to a different visitor id.&lt;/LI&gt;&lt;LI&gt;Whenever a visitor initializes a chat, we see an event in the source kafka queue, and we should the increment the count of the corresponding visitor in the&amp;nbsp;&lt;SPAN&gt;dlt_example_visitor_message_counts&lt;/SPAN&gt;&amp;nbsp;dlt table.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;table at t0&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;visitor_A: 2
visitor_B: 8
visitor_C: 5&lt;/LI-CODE&gt;&lt;P&gt;visitor_B starts a new chat and we get a new event.&lt;/P&gt;&lt;P&gt;table at t1&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;visitor_A: 2
visitor_B: 9
visitor_C: 5&lt;/LI-CODE&gt;&lt;P&gt;We’re are OK with the output of the table but when we look at the change data feed of the table, we see the following:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;visitor_A insert 2
visitor_B insert 9
visitor_C insert 5
visitor_A delete 2
visitor_B delete 8
visitor_C delete 5&lt;/LI-CODE&gt;&lt;P&gt;As you can see, all rows have been deleted and inserted. Instead, we would like to see the following in the CDF data:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;visitor_B upsert 9&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;Do you have any recommendations on how to modify the code or configuration of this pipeline to achieve that?&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 16 Dec 2024 12:58:57 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/avoiding-full-refresh-in-cdf-data-on-dlt-updates/m-p/102248#M41033</guid>
      <dc:creator>buraksivrikaya</dc:creator>
      <dc:date>2024-12-16T12:58:57Z</dc:date>
    </item>
    <item>
      <title>Re: Avoiding full refresh in CDF data on DLT updates</title>
      <link>https://community.databricks.com/t5/data-engineering/avoiding-full-refresh-in-cdf-data-on-dlt-updates/m-p/102253#M41034</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/30233"&gt;@buraksivrikaya&lt;/a&gt;,&lt;/P&gt;
&lt;P class="p1"&gt;To help avoiding a full refresh in the Change Data Feed (CDF) for the dlt_example_visitor_message_counts table, you can try the approach to separate the aggregation logic from the Delta Live Tables (DLT) pipeline.&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;OL class="ol1"&gt;
&lt;LI class="li1"&gt;&lt;STRONG&gt;Remove the Aggregation from DLT&lt;/STRONG&gt;: Instead of performing the aggregation within the DLT pipeline, you can create a DataFrame with the aggregation logic from the dlt_example_kafka_parsed table and then merge it into the target table visitor_message_counts outside of DLT.&lt;/LI&gt;
&lt;LI class="li1"&gt;&lt;STRONG&gt;Create a Separate Streaming Job&lt;/STRONG&gt;: Schedule a separate streaming job that runs after the DLT pipeline to calculate the counts and merge them into the target table. This way, you can avoid the complete output mode that causes the full refresh.&lt;/LI&gt;
&lt;LI class="li1"&gt;&lt;STRONG&gt;Use Structured Streaming with Watermarking&lt;/STRONG&gt;: If you prefer to keep the aggregation within the DLT pipeline, you can try using watermarking to manage the state and avoid full recomputes. However, this might still result in a full refresh if the watermarking does not align perfectly with your data update patterns&lt;/LI&gt;
&lt;/OL&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;For example:&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# Define the schema of the JSON messages&lt;/P&gt;
&lt;P class="p1"&gt;message_context_schema = StructType(&lt;/P&gt;
&lt;P class="p1"&gt;[&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;StructField("lectureId", LongType()),&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;StructField("sectionId", LongType()),&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;StructField("courseId", LongType()),&lt;/P&gt;
&lt;P class="p1"&gt;]&lt;/P&gt;
&lt;P class="p1"&gt;)&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;message_schema = StructType(&lt;/P&gt;
&lt;P class="p1"&gt;[&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;StructField("chatMessage", StringType()),&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;StructField("messageContext", message_context_schema),&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;StructField("visitorUuid", StringType()),&lt;/P&gt;
&lt;P class="p1"&gt;]&lt;/P&gt;
&lt;P class="p1"&gt;)&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;@dlt.table(name="dlt_example_kafka_raw", comment="Raw data ingested from Kafka topic.")&lt;/P&gt;
&lt;P class="p1"&gt;def kafka_raw():&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;return (&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;spark.readStream.format("kafka")&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.option("subscribe", KAFKA_TOPIC)&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.option("startingOffsets", "latest")&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.option("kafka.security.protocol", "SSL")&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.option("kafka.ssl.truststore.type", "PEM")&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.option(&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;"kafka.ssl.truststore.location",&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;"/dbfs/FileStore/udemy_ca_issuing_chain_udsrvs_net.pem",&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;)&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.load()&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;)&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;@dlt.table(&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;name="dlt_example_kafka_parsed",&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;comment="Parsed Kafka data with JSON fields extracted.",&lt;/P&gt;
&lt;P class="p1"&gt;)&lt;/P&gt;
&lt;P class="p1"&gt;def kafka_parsed():&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;return (&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;dlt.read_stream("dlt_example_kafka_raw")&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.selectExpr("CAST(value AS STRING) as json_value")&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.select(from_json(col("json_value"), message_schema).alias("data"))&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.select(&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;"data.chatMessage",&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;"data.messageContext",&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;col("data.visitorUuid").alias("visitor_id"),&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;)&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;)&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# Remove the aggregation logic from DLT&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# @dlt.table(&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# &lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;name="dlt_example_visitor_message_counts",&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# &lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;comment="Counts of messages per visitor id.",&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# &lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;table_properties={"pipelines.reset.allowed": "false"},&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# )&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# def visitor_message_counts():&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# &lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;return dlt.read_stream("dlt_example_kafka_parsed").groupBy("visitor_id").count()&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# Instead, create a separate streaming job to handle the aggregation&lt;/P&gt;
&lt;P class="p1"&gt;def update_visitor_message_counts():&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;kafka_parsed_df = spark.readStream.table("dlt_example_kafka_parsed")&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;visitor_message_counts_df = kafka_parsed_df.groupBy("visitor_id").count()&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;visitor_message_counts_df.writeStream \&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.format("delta") \&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.outputMode("update") \&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.option("checkpointLocation", "/path/to/checkpoint") \&lt;/P&gt;
&lt;P class="p1"&gt;&lt;SPAN class="Apple-converted-space"&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;.start("/path/to/visitor_message_counts")&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;# Schedule this job to run after the DLT pipeline&lt;/P&gt;</description>
      <pubDate>Mon, 16 Dec 2024 13:18:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/avoiding-full-refresh-in-cdf-data-on-dlt-updates/m-p/102253#M41034</guid>
      <dc:creator>Alberto_Umana</dc:creator>
      <dc:date>2024-12-16T13:18:44Z</dc:date>
    </item>
    <item>
      <title>Re: Avoiding full refresh in CDF data on DLT updates</title>
      <link>https://community.databricks.com/t5/data-engineering/avoiding-full-refresh-in-cdf-data-on-dlt-updates/m-p/102257#M41037</link>
      <description>&lt;P&gt;Thank you, &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/106294"&gt;@Alberto_Umana&lt;/a&gt;,&lt;/P&gt;&lt;P&gt;One of the main reasons we chose to use DLT was its ease of setup and monitoring, as well as the ability to avoid creating custom Spark Streaming jobs. From what I understand, your suggestion is to use both DLT and Spark Streaming jobs, which could potentially increase computing costs.&lt;/P&gt;&lt;P&gt;Does this mean DLT is not an appropriate tool for such a simple use case, where we are counting streaming events per group?&lt;/P&gt;</description>
      <pubDate>Mon, 16 Dec 2024 13:28:14 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/avoiding-full-refresh-in-cdf-data-on-dlt-updates/m-p/102257#M41037</guid>
      <dc:creator>buraksivrikaya</dc:creator>
      <dc:date>2024-12-16T13:28:14Z</dc:date>
    </item>
  </channel>
</rss>

