Databricks Spark Declarative Pipelines (SDP) makes handling SCD easy with its Auto CDC feature. I had a customer who had to explode out an array as part of their pipeline in their streaming table and wanted to work through their pipeline like:
Source -> explode() array into multiple rows (same key) -> Auto CDC -> Materialized View
Unfortunately, what happens here is we need to also delete records by omission of features – i.e. you cannot delete all the records for the key and then insert the new set of features – thus Auto CDC will not work here. So that means you need to move the explode() into the logic of the Materialized View. So it would be something like this:
Source -> Auto CDC -> explode() array into multiple rows (same key) -> Materialized View
However, the explode() function now creates rows that are ordered nondeterministically. That would mean that your Materialized View can no longer take advantage of the incremental processing and must fully recompute each time, thereby increasing the processing and infrastructure cost. Therefore, we need to consider an alternative approach that does not rely on Auto CDC and MVs.
This post delves into a practical, production-grade pattern for replacing all exploded records by key using the explode() function—from raw nested arrays to final upserts—utilising PySpark and SQL in Databricks Delta Lake.
The Scenario: Exploded Replacement
Suppose you have incoming data where features are stored as arrays, and each update replaces all features for a given key. The workflow you need:
Again, while Auto CDC can do this, explode() on the subsequent MV on will cause the full recompute, meaning your costs and latency go up significantly. So we will have to do it the hard way ( that is, manually).
Step 1: The Raw Data (Arrays That Must Be Exploded)
Incoming data in this example will arrive in this format (basically, arrays of records that contain a key, an array of features, and the timestamp.
incoming_data = [
("PROD001", ["color:red", "size:large", "material:cotton"], "2024-10-01 10:00:00"),
("PROD002", ["color:blue", "size:small"], "2024-10-01 10:05:00"),
('PROD003', [“brand:databricks”, “category:shoes”, “color:white”, “size:10”], “2024-01-02 15:00:00”)
]
Step 2: Explode Arrays to Rows in PySpark
First, you explode() the feature array column into rows and parse each feature:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
df_incoming = spark.createDataFrame(
incoming_data, ["product_id", "feature_array", "updated_at"]
)
df_exploded = df_incoming.withColumn("feature_raw", explode(col("feature_array")))
df_processed = (df_exploded
.withColumn("feature_name", split(col("feature_raw"), ":")[0])
.withColumn("feature_value", split(col("feature_raw"), ":")[1])
.select("product_id", "feature_name", "feature_value", "updated_at")
)
df_processed.write.mode("overwrite").saveAsTable("catalog.schema.product_features")
spark.table("catalog.schema.product_features").display()
Result:
+----------+------------+-------------+-------------------+
|product_id|feature_name|feature_value| updated_at|
+----------+------------+-------------+-------------------+
| PROD001| size| large|2024-01-02 15:00:00|
| PROD001| material| cotton|2024-01-02 15:00:00|
| PROD002| size| small|2024-01-02 15:00:00|
| PROD003| brand| databricks|2024-01-02 15:00:00|
| PROD003| category| shoes|2024-01-02 15:00:00|
| PROD003| color| white|2024-01-02 15:00:00|
| PROD003| size| 10|2024-01-02 15:00:00|
+----------+------------+-------------+-------------------+
Step 3: Replace Records by Key (Delete and Insert)
The goal is to delete all previous records for a given key for each incoming product, then insert the new, exploded values.
New data will now be ingested in the pipeline:
incoming_data_df = spark.createDataFrame([
('PROD001', ['size:large', 'material:cotton'], '2024-01-02 15:00:00'),
('PROD002', ['size:small'], '2024-01-02 15:00:00')
], ["product_id", "feature_array", "updated_at"])
incoming_data_df.write.mode("overwrite").saveAsTable("catalog.schema.source_streaming_table")
spark.table("catalog.schema.source_streaming_table").display()
+----------+--------------------+-------------------+
|product_id| feature_array| updated_at|
+----------+--------------------+-------------------+
| PROD001|[size:large, mate...|2024-01-02 15:00:00|
| PROD002| [size:small]|2024-01-02 15:00:00|
+----------+--------------------+-------------------+
For this new data, we expect PROD001 to have an implicit delete of the row with color, and we expect PROD002 to have an implicit delete of the row with color as well, because the incoming data does not contain those keys in their arrays.
Define a foreachBatch Function to Replace Data by Key
The approach here is to create a foreachBatch function that helps with the explode, deletes all keys matched to the incoming batch of data, and then appends those rows. The reason why foreachBatch is needed here is that we need to execute a custom function (to execute the explode/delete/append logic) on our exploded streaming Delta Table. Note that this function is idempotent, so it can be run multiple times if there is a failure in one of the batches (deleting records that aren’t there is fine, and the last step is a write, so a failure will not result in more than one write). However, we can be even more sure of idempotency by including txnAppId and txnVersion in the options of the writeStream.
from pyspark.sql.functions import explode, split, col
from delta.tables import DeltaTable
app_id = "12345678"
def process_batch(batch_df, batch_id):
# Step 1: Explode incoming batch
exploded = batch_df \
.withColumn("feature_item", explode("feature_array")) \
.withColumn("feature_name", split(col("feature_item"), ":")[0]) \
.withColumn("feature_value", split(col("feature_item"), ":")[1]) \
.select("product_id", "feature_name", "feature_value", "updated_at")
# Step 2: Get unique product_ids in this microbatch
keys_to_replace = exploded.select("product_id").distinct()
# Step 3: Delete old records for these product_ids in target Delta table
delta_table = DeltaTable.forName(spark, "catalog.schema.product_features")
delta_table.alias("target").merge(
keys_to_replace.alias("source"),
f"target.{"product_id"} = source.{"product_id"}"
).whenMatchedDelete().execute()
# Step 4: Insert new exploded records
exploded.write.format("delta").option("txnAppId", app_id).option("txnVersion", batch_id).mode("append")\
.saveAsTable("catalog.schema.product_features")
We will then apply this foreachBatch function for each micro-batch processed by our Structured Streams
(spark.readStream
.table("catalog.schema.source_streaming_table") # or .format("cloudFiles")/Kafka/etc.
.writeStream
.foreachBatch(process_batch)
.outputMode("append")
.option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/features")
.trigger(availableNow=True)
.start()
);
Result
+----------+------------+-------------+-------------------+
|product_id|feature_name|feature_value| updated_at|
+----------+------------+-------------+-------------------+
| PROD001| size| large|2024-01-02 15:00:00|
| PROD001| material| cotton|2024-01-02 15:00:00|
| PROD002| size| small|2024-01-02 15:00:00|
| PROD003| brand| databricks|2024-01-02 15:00:00|
| PROD003| category| shoes|2024-01-02 15:00:00|
| PROD003| color| white|2024-01-02 15:00:00|
| PROD003| size| 10|2024-01-02 15:00:00|
+----------+------------+-------------+-------------------+
Sure enough, the color features were deleted in both PROD001 and PROD002, so the code works.
Key Takeaways
Here’s the full notebook for reference: https://github.com/howardwu-db/Manual-SCD-with-Explode/tree/main
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.