12-16-2024 04:58 AM
Hi,
We have a DLT pipeline with the following source code:
import dlt
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import LongType, StringType, StructField, StructType
KAFKA_TOPIC = "..."
BOOTSTRAP_SERVERS = "..."
# Define the schema of the JSON messages
message_context_schema = StructType(
[
StructField("lectureId", LongType()),
StructField("sectionId", LongType()),
StructField("courseId", LongType()),
]
)
message_schema = StructType(
[
StructField("chatMessage", StringType()),
StructField("messageContext", message_context_schema),
StructField("visitorUuid", StringType()),
]
)
@dlt.table(name="dlt_example_kafka_raw", comment="Raw data ingested from Kafka topic.")
def kafka_raw():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.type", "PEM")
.option(
"kafka.ssl.truststore.location",
"/dbfs/FileStore/udemy_ca_issuing_chain_udsrvs_net.pem",
)
.load()
)
@dlt.table(
name="dlt_example_kafka_parsed",
comment="Parsed Kafka data with JSON fields extracted.",
)
def kafka_parsed():
return (
dlt.read_stream("dlt_example_kafka_raw")
.selectExpr("CAST(value AS STRING) as json_value")
.select(from_json(col("json_value"), message_schema).alias("data"))
.select(
"data.chatMessage",
"data.messageContext",
col("data.visitorUuid").alias("visitor_id"),
)
)
@dlt.table(
name="dlt_example_visitor_message_counts",
comment="Counts of messages per visitor id.",
table_properties={"pipelines.reset.allowed": "false"},
)
def visitor_message_counts():
return dlt.read_stream("dlt_example_kafka_parsed").groupBy("visitor_id").count()
We are capturing the change data feed from the final DLT table (dlt_example_visitor_message_counts) to be processed by other workflows. The issue we’re facing is that whenever a single Kafka event occurs, the entire data in the final table is refreshed, whereas we only want a single row to be upserted.
Let me clarify it with an example:
table at t0
visitor_A: 2
visitor_B: 8
visitor_C: 5
visitor_B starts a new chat and we get a new event.
table at t1
visitor_A: 2
visitor_B: 9
visitor_C: 5
We’re are OK with the output of the table but when we look at the change data feed of the table, we see the following:
visitor_A insert 2
visitor_B insert 9
visitor_C insert 5
visitor_A delete 2
visitor_B delete 8
visitor_C delete 5
As you can see, all rows have been deleted and inserted. Instead, we would like to see the following in the CDF data:
visitor_B upsert 9
Do you have any recommendations on how to modify the code or configuration of this pipeline to achieve that?
12-16-2024 05:18 AM
Hi @buraksivrikaya,
To help avoiding a full refresh in the Change Data Feed (CDF) for the dlt_example_visitor_message_counts table, you can try the approach to separate the aggregation logic from the Delta Live Tables (DLT) pipeline.
For example:
# Define the schema of the JSON messages
message_context_schema = StructType(
[
StructField("lectureId", LongType()),
StructField("sectionId", LongType()),
StructField("courseId", LongType()),
]
)
message_schema = StructType(
[
StructField("chatMessage", StringType()),
StructField("messageContext", message_context_schema),
StructField("visitorUuid", StringType()),
]
)
@dlt.table(name="dlt_example_kafka_raw", comment="Raw data ingested from Kafka topic.")
def kafka_raw():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.type", "PEM")
.option(
"kafka.ssl.truststore.location",
"/dbfs/FileStore/udemy_ca_issuing_chain_udsrvs_net.pem",
)
.load()
)
@dlt.table(
name="dlt_example_kafka_parsed",
comment="Parsed Kafka data with JSON fields extracted.",
)
def kafka_parsed():
return (
dlt.read_stream("dlt_example_kafka_raw")
.selectExpr("CAST(value AS STRING) as json_value")
.select(from_json(col("json_value"), message_schema).alias("data"))
.select(
"data.chatMessage",
"data.messageContext",
col("data.visitorUuid").alias("visitor_id"),
)
)
# Remove the aggregation logic from DLT
# @dlt.table(
# name="dlt_example_visitor_message_counts",
# comment="Counts of messages per visitor id.",
# table_properties={"pipelines.reset.allowed": "false"},
# )
# def visitor_message_counts():
# return dlt.read_stream("dlt_example_kafka_parsed").groupBy("visitor_id").count()
# Instead, create a separate streaming job to handle the aggregation
def update_visitor_message_counts():
kafka_parsed_df = spark.readStream.table("dlt_example_kafka_parsed")
visitor_message_counts_df = kafka_parsed_df.groupBy("visitor_id").count()
visitor_message_counts_df.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start("/path/to/visitor_message_counts")
# Schedule this job to run after the DLT pipeline
12-16-2024 05:28 AM
Thank you, @Alberto_Umana,
One of the main reasons we chose to use DLT was its ease of setup and monitoring, as well as the ability to avoid creating custom Spark Streaming jobs. From what I understand, your suggestion is to use both DLT and Spark Streaming jobs, which could potentially increase computing costs.
Does this mean DLT is not an appropriate tool for such a simple use case, where we are counting streaming events per group?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group