Hi @buraksivrikaya,
To help avoiding a full refresh in the Change Data Feed (CDF) for the dlt_example_visitor_message_counts table, you can try the approach to separate the aggregation logic from the Delta Live Tables (DLT) pipeline.
- Remove the Aggregation from DLT: Instead of performing the aggregation within the DLT pipeline, you can create a DataFrame with the aggregation logic from the dlt_example_kafka_parsed table and then merge it into the target table visitor_message_counts outside of DLT.
- Create a Separate Streaming Job: Schedule a separate streaming job that runs after the DLT pipeline to calculate the counts and merge them into the target table. This way, you can avoid the complete output mode that causes the full refresh.
- Use Structured Streaming with Watermarking: If you prefer to keep the aggregation within the DLT pipeline, you can try using watermarking to manage the state and avoid full recomputes. However, this might still result in a full refresh if the watermarking does not align perfectly with your data update patterns
For example:
# Define the schema of the JSON messages
message_context_schema = StructType(
[
StructField("lectureId", LongType()),
StructField("sectionId", LongType()),
StructField("courseId", LongType()),
]
)
message_schema = StructType(
[
StructField("chatMessage", StringType()),
StructField("messageContext", message_context_schema),
StructField("visitorUuid", StringType()),
]
)
@dlt.table(name="dlt_example_kafka_raw", comment="Raw data ingested from Kafka topic.")
def kafka_raw():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.type", "PEM")
.option(
"kafka.ssl.truststore.location",
"/dbfs/FileStore/udemy_ca_issuing_chain_udsrvs_net.pem",
)
.load()
)
@dlt.table(
name="dlt_example_kafka_parsed",
comment="Parsed Kafka data with JSON fields extracted.",
)
def kafka_parsed():
return (
dlt.read_stream("dlt_example_kafka_raw")
.selectExpr("CAST(value AS STRING) as json_value")
.select(from_json(col("json_value"), message_schema).alias("data"))
.select(
"data.chatMessage",
"data.messageContext",
col("data.visitorUuid").alias("visitor_id"),
)
)
# Remove the aggregation logic from DLT
# @dlt.table(
# name="dlt_example_visitor_message_counts",
# comment="Counts of messages per visitor id.",
# table_properties={"pipelines.reset.allowed": "false"},
# )
# def visitor_message_counts():
# return dlt.read_stream("dlt_example_kafka_parsed").groupBy("visitor_id").count()
# Instead, create a separate streaming job to handle the aggregation
def update_visitor_message_counts():
kafka_parsed_df = spark.readStream.table("dlt_example_kafka_parsed")
visitor_message_counts_df = kafka_parsed_df.groupBy("visitor_id").count()
visitor_message_counts_df.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start("/path/to/visitor_message_counts")
# Schedule this job to run after the DLT pipeline