cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
howard_wu
Databricks Employee
Databricks Employee

Databricks Spark Declarative Pipelines (SDP) makes handling SCD easy with its Auto CDC feature. I had a customer who had to explode out an array as part of their pipeline in their streaming table and wanted to work through their pipeline like:

Source -> explode() array into multiple rows (same key) -> Auto CDC -> Materialized View

Unfortunately, what happens here is we need to also delete records by omission of features – i.e. you cannot delete all the records for the key and then insert the new set of features – thus Auto CDC will not work here. So that means you need to move the explode() into the logic of the Materialized View. So it would be something like this:

Source -> Auto CDC -> explode() array into multiple rows (same key) -> Materialized View

However, the explode() function now creates rows that are ordered nondeterministically. That would mean that your Materialized View can no longer take advantage of the incremental processing and must fully recompute each time, thereby increasing the processing and infrastructure cost. Therefore, we need to consider an alternative approach that does not rely on Auto CDC and MVs.

This post delves into a practical, production-grade pattern for replacing all exploded records by key using the explode() function—from raw nested arrays to final upserts—utilising PySpark and SQL in Databricks Delta Lake.

The Scenario: Exploded Replacement

Suppose you have incoming data where features are stored as arrays, and each update replaces all features for a given key. The workflow you need:

  1. Incoming record with arrays
  2. Explode the arrays into flat rows
  3. Delete all old exploded records for that key. This means that implicit deletes are possible when old keys no longer exist.
  4. Insert the new exploded records

Again, while Auto CDC can do this, explode() on the subsequent MV on will cause the full recompute, meaning your costs and latency go up significantly. So we will have to do it the hard way ( that is, manually).

 

Step 1: The Raw Data (Arrays That Must Be Exploded)

Incoming data in this example will arrive in this format (basically, arrays of records that contain a key, an array of features, and the timestamp.

incoming_data = [
    ("PROD001", ["color:red", "size:large", "material:cotton"], "2024-10-01 10:00:00"),
    ("PROD002", ["color:blue", "size:small"], "2024-10-01 10:05:00"),
    ('PROD003', [“brand:databricks”, “category:shoes”, “color:white”, “size:10”], “2024-01-02 15:00:00”)
]

 

Step 2: Explode Arrays to Rows in PySpark

First, you explode() the feature array column into rows and parse each feature:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col

df_incoming = spark.createDataFrame(
    incoming_data, ["product_id", "feature_array", "updated_at"]
)

df_exploded = df_incoming.withColumn("feature_raw", explode(col("feature_array")))
df_processed = (df_exploded
                 .withColumn("feature_name", split(col("feature_raw"), ":")[0])
                 .withColumn("feature_value", split(col("feature_raw"), ":")[1])
                 .select("product_id", "feature_name", "feature_value", "updated_at")
               )


df_processed.write.mode("overwrite").saveAsTable("catalog.schema.product_features")
spark.table("catalog.schema.product_features").display()

Result:

+----------+------------+-------------+-------------------+
|product_id|feature_name|feature_value|         updated_at|
+----------+------------+-------------+-------------------+
|   PROD001|        size|        large|2024-01-02 15:00:00|
|   PROD001|    material|       cotton|2024-01-02 15:00:00|
|   PROD002|        size|        small|2024-01-02 15:00:00|
|   PROD003|       brand|   databricks|2024-01-02 15:00:00|
|   PROD003|    category|        shoes|2024-01-02 15:00:00|
|   PROD003|       color|        white|2024-01-02 15:00:00|
|   PROD003|        size|           10|2024-01-02 15:00:00|
+----------+------------+-------------+-------------------+

 

Step 3: Replace Records by Key (Delete and Insert)

The goal is to delete all previous records for a given key for each incoming product, then insert the new, exploded values.

New data will now be ingested in the pipeline:

incoming_data_df = spark.createDataFrame([
    ('PROD001', ['size:large', 'material:cotton'], '2024-01-02 15:00:00'),
    ('PROD002', ['size:small'], '2024-01-02 15:00:00')
], ["product_id", "feature_array", "updated_at"])


incoming_data_df.write.mode("overwrite").saveAsTable("catalog.schema.source_streaming_table")
spark.table("catalog.schema.source_streaming_table").display()
+----------+--------------------+-------------------+
|product_id|       feature_array|         updated_at|
+----------+--------------------+-------------------+
|   PROD001|[size:large, mate...|2024-01-02 15:00:00|
|   PROD002|        [size:small]|2024-01-02 15:00:00|
+----------+--------------------+-------------------+

For this new data, we expect PROD001 to have an implicit delete of the row with color, and we expect PROD002 to have an implicit delete of the row with color as well, because the incoming data does not contain those keys in their arrays.

Define a foreachBatch Function to Replace Data by Key

The approach here is to create a foreachBatch function that helps with the explode, deletes all keys matched to the incoming batch of data, and then appends those rows. The reason why foreachBatch is needed here is that we need to execute a custom function (to execute the explode/delete/append logic) on our exploded streaming Delta Table. Note that this function is idempotent, so it can be run multiple times if there is a failure in one of the batches (deleting records that aren’t there is fine, and the last step is a write, so a failure will not result in more than one write). However, we can be even more sure of idempotency by including txnAppId and txnVersion in the options of the writeStream.

from pyspark.sql.functions import explode, split, col
from delta.tables import DeltaTable


app_id = "12345678"


def process_batch(batch_df, batch_id):
    # Step 1: Explode incoming batch
    exploded = batch_df \
        .withColumn("feature_item", explode("feature_array")) \
        .withColumn("feature_name", split(col("feature_item"), ":")[0]) \
        .withColumn("feature_value", split(col("feature_item"), ":")[1]) \
        .select("product_id", "feature_name", "feature_value", "updated_at")


    # Step 2: Get unique product_ids in this microbatch
    keys_to_replace = exploded.select("product_id").distinct()


    # Step 3: Delete old records for these product_ids in target Delta table
    delta_table = DeltaTable.forName(spark, "catalog.schema.product_features")
    delta_table.alias("target").merge(
          keys_to_replace.alias("source"),
          f"target.{"product_id"} = source.{"product_id"}"
      ).whenMatchedDelete().execute()


    # Step 4: Insert new exploded records  
	exploded.write.format("delta").option("txnAppId", app_id).option("txnVersion", batch_id).mode("append")\
       .saveAsTable("catalog.schema.product_features")

We will then apply this foreachBatch function for each micro-batch processed by our Structured Streams

(spark.readStream 
  .table("catalog.schema.source_streaming_table") # or .format("cloudFiles")/Kafka/etc. 
  .writeStream
  .foreachBatch(process_batch)
  .outputMode("append")
  .option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/features")
  .trigger(availableNow=True)
  .start()
);

Result

+----------+------------+-------------+-------------------+
|product_id|feature_name|feature_value|         updated_at|
+----------+------------+-------------+-------------------+
|   PROD001|        size|        large|2024-01-02 15:00:00|
|   PROD001|    material|       cotton|2024-01-02 15:00:00|
|   PROD002|        size|        small|2024-01-02 15:00:00|
|   PROD003|       brand|   databricks|2024-01-02 15:00:00|
|   PROD003|    category|        shoes|2024-01-02 15:00:00|
|   PROD003|       color|        white|2024-01-02 15:00:00|
|   PROD003|        size|           10|2024-01-02 15:00:00|
+----------+------------+-------------+-------------------+


Sure enough, the color features were deleted in both PROD001 and PROD002, so the code works.

Key Takeaways

  • The explode() function is the essential first transformation for array/nested data; however, it can create the rows in a nondeterministic order. As such, it cannot give incremental processing for materialized views if you do this within an auto CDC on Declarative Pipelines. Be cautious when using explode() to process very large arrays, as it will generate many rows. Use memory-optimized instances of an appropriate size to avoid spilling data to disk.
  • This pattern (delete all by key, insert new exploded rows) is a group-wise overwrite, so it’s not your average SCD operation.
  • Works seamlessly and incrementally in Databricks Delta Lake via PySpark, as long as you code your own UDF to handle updates and merges.

Here’s the full notebook for reference: https://github.com/howardwu-db/Manual-SCD-with-Explode/tree/main