cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Avoiding full refresh in CDF data on DLT updates

buraksivrikaya
New Contributor II

Hi,

We have a DLT pipeline with the following source code:

import dlt
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import LongType, StringType, StructField, StructType

KAFKA_TOPIC = "..."
BOOTSTRAP_SERVERS = "..."

# Define the schema of the JSON messages
message_context_schema = StructType(
[
StructField("lectureId", LongType()),
StructField("sectionId", LongType()),
StructField("courseId", LongType()),
]
)

message_schema = StructType(
[
StructField("chatMessage", StringType()),
StructField("messageContext", message_context_schema),
StructField("visitorUuid", StringType()),
]
)


@dlt.table(name="dlt_example_kafka_raw", comment="Raw data ingested from Kafka topic.")
def kafka_raw():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.type", "PEM")
.option(
"kafka.ssl.truststore.location",
"/dbfs/FileStore/udemy_ca_issuing_chain_udsrvs_net.pem",
)
.load()
)


@dlt.table(
name="dlt_example_kafka_parsed",
comment="Parsed Kafka data with JSON fields extracted.",
)
def kafka_parsed():
return (
dlt.read_stream("dlt_example_kafka_raw")
.selectExpr("CAST(value AS STRING) as json_value")
.select(from_json(col("json_value"), message_schema).alias("data"))
.select(
"data.chatMessage",
"data.messageContext",
col("data.visitorUuid").alias("visitor_id"),
)
)


@dlt.table(
name="dlt_example_visitor_message_counts",
comment="Counts of messages per visitor id.",
table_properties={"pipelines.reset.allowed": "false"},
)
def visitor_message_counts():
return dlt.read_stream("dlt_example_kafka_parsed").groupBy("visitor_id").count()

We are capturing the change data feed from the final DLT table (dlt_example_visitor_message_counts) to be processed by other workflows. The issue we’re facing is that whenever a single Kafka event occurs, the entire data in the final table is refreshed, whereas we only want a single row to be upserted.

Let me clarify it with an example:

  • dlt_example_visitor_message_counts is supposed to store the chat counts per visitor id. There can be multiple rows each corresponding to a different visitor id.
  • Whenever a visitor initializes a chat, we see an event in the source kafka queue, and we should the increment the count of the corresponding visitor in the dlt_example_visitor_message_counts dlt table.

table at t0

visitor_A: 2
visitor_B: 8
visitor_C: 5

visitor_B starts a new chat and we get a new event.

table at t1

visitor_A: 2
visitor_B: 9
visitor_C: 5

We’re are OK with the output of the table but when we look at the change data feed of the table, we see the following:

visitor_A insert 2
visitor_B insert 9
visitor_C insert 5
visitor_A delete 2
visitor_B delete 8
visitor_C delete 5

As you can see, all rows have been deleted and inserted. Instead, we would like to see the following in the CDF data:

visitor_B upsert 9

Do you have any recommendations on how to modify the code or configuration of this pipeline to achieve that?

2 REPLIES 2

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @buraksivrikaya,

To help avoiding a full refresh in the Change Data Feed (CDF) for the dlt_example_visitor_message_counts table, you can try the approach to separate the aggregation logic from the Delta Live Tables (DLT) pipeline.

 

  1. Remove the Aggregation from DLT: Instead of performing the aggregation within the DLT pipeline, you can create a DataFrame with the aggregation logic from the dlt_example_kafka_parsed table and then merge it into the target table visitor_message_counts outside of DLT.
  2. Create a Separate Streaming Job: Schedule a separate streaming job that runs after the DLT pipeline to calculate the counts and merge them into the target table. This way, you can avoid the complete output mode that causes the full refresh.
  3. Use Structured Streaming with Watermarking: If you prefer to keep the aggregation within the DLT pipeline, you can try using watermarking to manage the state and avoid full recomputes. However, this might still result in a full refresh if the watermarking does not align perfectly with your data update patterns

 

For example:

 

 

# Define the schema of the JSON messages

message_context_schema = StructType(

[

    StructField("lectureId", LongType()),

    StructField("sectionId", LongType()),

    StructField("courseId", LongType()),

]

)

 

message_schema = StructType(

[

    StructField("chatMessage", StringType()),

    StructField("messageContext", message_context_schema),

    StructField("visitorUuid", StringType()),

]

)

 

@dlt.table(name="dlt_example_kafka_raw", comment="Raw data ingested from Kafka topic.")

def kafka_raw():

    return (

        spark.readStream.format("kafka")

        .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)

        .option("subscribe", KAFKA_TOPIC)

        .option("startingOffsets", "latest")

        .option("kafka.security.protocol", "SSL")

        .option("kafka.ssl.truststore.type", "PEM")

        .option(

            "kafka.ssl.truststore.location",

            "/dbfs/FileStore/udemy_ca_issuing_chain_udsrvs_net.pem",

        )

        .load()

    )

 

@dlt.table(

    name="dlt_example_kafka_parsed",

    comment="Parsed Kafka data with JSON fields extracted.",

)

def kafka_parsed():

    return (

        dlt.read_stream("dlt_example_kafka_raw")

        .selectExpr("CAST(value AS STRING) as json_value")

        .select(from_json(col("json_value"), message_schema).alias("data"))

        .select(

            "data.chatMessage",

            "data.messageContext",

            col("data.visitorUuid").alias("visitor_id"),

        )

    )

 

# Remove the aggregation logic from DLT

 

# @dlt.table(

 

#     name="dlt_example_visitor_message_counts",

 

#     comment="Counts of messages per visitor id.",

 

#     table_properties={"pipelines.reset.allowed": "false"},

 

# )

 

# def visitor_message_counts():

 

#     return dlt.read_stream("dlt_example_kafka_parsed").groupBy("visitor_id").count()

 

# Instead, create a separate streaming job to handle the aggregation

def update_visitor_message_counts():

    kafka_parsed_df = spark.readStream.table("dlt_example_kafka_parsed")

    visitor_message_counts_df = kafka_parsed_df.groupBy("visitor_id").count()

 

    visitor_message_counts_df.writeStream \

        .format("delta") \

        .outputMode("update") \

        .option("checkpointLocation", "/path/to/checkpoint") \

        .start("/path/to/visitor_message_counts")

 

# Schedule this job to run after the DLT pipeline

buraksivrikaya
New Contributor II

Thank you, @Alberto_Umana,

One of the main reasons we chose to use DLT was its ease of setup and monitoring, as well as the ability to avoid creating custom Spark Streaming jobs. From what I understand, your suggestion is to use both DLT and Spark Streaming jobs, which could potentially increase computing costs.

Does this mean DLT is not an appropriate tool for such a simple use case, where we are counting streaming events per group?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group