a week ago
Just published new video on Databricks Performance Series to try to clearly explain how DataFrame caching over Delta Tables behaves when updates on underlying table are performed. I came across this use case in my recent project and struggled a little bit until completely understand. So, sharing with community to shed light on this.
I hope it helps or even I could get some feedback on further considerations I didn't take into account.
Ping me if you need source code.
Thanks!
Wednesday - last edited Wednesday
@Coffee77 thanks for knowledge sharing!
I see in the YT vid you're using the .persist() method within a notebook, cool stuff! Does this strategy work for users who don't use Pyspark i.e. SQL heavy folk? What would their strategy be? 😎
There's also other variations of cache that get used within databricks. I stumbled across this article last week: https://docs.databricks.com/aws/en/sql/user/queries/query-caching
It's certainly worth a read 🙂.
All the best,
BS
Wednesday
✅ Source Code used in sample here:
# Databricks notebook source
################################################################################################
# 1) Function to create sample DataFrame and overwrite results in "people_table" Delta Table
################################################################################################
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
def create_or_reset_sample_df() -> DataFrame:
schema = StructType([
StructField("Id", IntegerType(), False),
StructField("Name", StringType(), True),
StructField("Surname", StringType(), True),
StructField("Age", IntegerType(), True)
])
data = [
(1, "Alice", "Smith", 25),
(2, "Bob", "Jordan", 30),
(3, "Charlie", "Durant", 35)
]
df = spark.createDataFrame(data, schema)
df.write.mode("overwrite").saveAsTable("people_table")
return df
def print_separator():
print("=" * 100)
create_or_reset_sample_df().show()
# COMMAND ----------
################################################################################################
# 2) Inspect plans WITHOUT caching
################################################################################################
create_or_reset_sample_df()
# Get Dataframe from Delta Table
df = spark.sql("select * from people_table where Age >= 18")
# Action 1
print(f"Total People: {df.count()}\n")
df.explain()
# Filter and Action 2
name1 = 'Alice'
df1 = df.filter(f"Name = '{name1}' ")
print(f"Number of people with name '{name1}': {df1.count()}\n")
df1.explain()
# Filter and Action 3
name2 = 'Bob'
df2 = df.filter(f"Name = '{name2}'")
print(f"Number of people with name '{name2}': {df2.count()}\n")
df2.explain()
print_separator()
print("CONCLUSION: All DFs (df, df1, df2) are created based on scans over parquet files from delta table")
print("IMPORTANT: Up to 3 different queries are performed over original delta table due to Lazy Evaluation")
print_separator()
# COMMAND ----------
################################################################################################
# 3) Inspect Plans WITHOUT caching BUT replaced 'df' by original Spark SQL Query
################################################################################################
create_or_reset_sample_df()
# Get Dataframe from Delta Table
df = spark.sql("select * from people_table where Age >= 18")
# Action 1
print(f"Total People: {df.count()}\n")
df.explain()
# Filter and Action 2 - Reference to 'df' replaced by original query
name1 = 'Alice'
df1 = spark.sql("select * from people_table where Age >= 18").filter(f"Name = '{name1}' ")
print(f"Number of people with name '{name1}': {df1.count()}\n")
df1.explain()
# Filter and Action 3 - Reference to 'df' replaced by original query
name2 = 'Bob'
df2 = spark.sql("select * from people_table where Age >= 18").filter(f"Name = '{name2}'")
print(f"Number of people with name '{name2}': {df2.count()}\n")
df2.explain()
print_separator()
print("CONCLUSION: We get exactly same plans as SAMPLE 2")
print_separator()
# COMMAND ----------
################################################################################################
# 4) Inspect plans WITH caching (.persist() or .cache() methods)
################################################################################################
create_or_reset_sample_df()
# Get Dataframe from Delta Table
df = spark.sql("select * from people_table where Age >= 18")
# Action 1
print(f"Total People: {df.count()}\n")
df.explain()
# Persist Dataframe
df.persist()
# Filter and Action 2 - 'df' already cached / persisted
name1 = 'Alice'
df1 = df.filter(f"Name = '{name1}' ")
print(f"Number of people with name '{name1}': {df1.count()}\n")
df1.explain()
# Filter and Action 3 - 'df' already cached / persisted
name2 = 'Bob'
df2 = df.filter(f"Name = '{name2}'")
print(f"Number of people with name '{name2}': {df2.count()}\n")
df2.explain()
# Unpersist Dataframe
df.unpersist()
print_separator()
print(f"CONCLUSION: After caching ('persist()') rest of DFs are created based on cached 'df'")
print(f"IMPORTANT: Only one query is performed over parquet files from original delta table")
print_separator()
# COMMAND ----------
########################################################################################################################
# 5) WHAT IF underlying Delta Table is updated? (Cluster Runtime: 16.4 LTS)
# A) Update underlying Delta Table with rows NOT MATCHING 'where' clauses used to create DFs
########################################################################################################################
CreateOrResetSampleDF()
# Get DataFrame from Delta Table
df = spark.sql("select * from people_table where Age >= 18")
# Set DataFrame to be persisted
df.persist()
# Insert new record in underlying Delta Table over which DataFrame was cached.
# New record (4,'Bob', 'Carrey', 15) does not match 'where Age >= 18' filter as 'Age = 15' is used
total_people_before_insert = df.count()
print(f"Total People in Cached DataFrame BEFORE inserting new record in underlying Delta Table: {total_people_before_insert}")
spark.sql("insert into people_table values(4,'Bob', 'Carrey', 15)")
total_people_after_insert = df.count()
print(f"Total People in Cached DataFrame AFTER inserting new record in underlying Delta Table: {total_people_after_insert}")
print(f"Inserted Record (4,'Bob', 'Carrey', 15) in underlying Delta Table IS NOT present in Cached DataFrame as 15 does not match 'Age >= 18' where clause")
df.show()
# As inserted record does not match 'where' filter, we do not expect cache gets invalidated / refreshed remaining "stale"
assert(total_people_before_insert == total_people_after_insert)
# Filter and action for name 'Bob'
name = 'Bob'
df2 = df.filter(f"Name = '{name}'")
total_people_with_name = df2.count()
print(f"People with name '{name}': {df2.count()}")
assert(total_people_with_name == 1)
# Unpersist DataFrame
df.unpersist()
print_separator()
print(f"CONCLUSION: If new inserted record in underlying Delta Table does not match 'where' clause as from which Cached DataFrame was generated -> CACHE is NOT REFRESHED")
print_separator()
# COMMAND ----------
########################################################################################################################
# 5) WHAT IF underlying Delta Table is updated? (Cluster Runtime: 16.4 LTS)
# B) Update underlying Delta Table with rows MATCHING 'where' clauses used to create DFs
########################################################################################################################
CreateOrResetSampleDF()
# Get DataFrame from Delta Table
df = spark.sql("select * from people_table where Age >= 18")
# Set DataFrame to be persisted
df.persist()
# Insert new record in underlying Delta Table over which DataFrame was cached.
# New record (4,'Bob', 'Wilkins', 45) match 'where Age >= 18' filter as 'Age = 45' is used
total_people_before_insert = df.count()
print(f"Total People in Cached DataFrame BEFORE inserting new record in underlying Delta Table: {total_people_before_insert}")
spark.sql("insert into people_table values(4,'Bob', 'Wilkins', 45)")
total_people_after_insert = df.count()
print(f"Total People in Cached DataFrame AFTER inserting new record in underlying Delta Table: {total_people_after_insert}")
print(f"New inserted record (4,'Bob', 'Wilkins', 45) into underlying Delta Table IS present in Cached DataFrame as 45 matches 'Age >= 18' where clause")
df.show()
# As inserted record match 'where' filter, we expect cache gets invalidated / refreshed.
assert(total_people_before_insert < total_people_after_insert)
# Filter and Action for name 'Bob'
name = 'Bob'
df2 = df.filter(f"Name = '{name}'")
total_people_with_name = df2.count()
print(f"People with name '{name}': {df2.count()}")
assert(total_people_with_name == 2) # We expect new record to appear in cached DataFrame as a result of cache invalidation
# Unpersist DataFrame
df.unpersist()
print_separator()
print("CONCLUSION: If new inserted record in underlying Delta Table MATCHES 'where' clause as from which Cached DF was generated -> CACHE is INVALIDATED / REFRESHED.")
print("IMPORTANT: Cached DataFrame is only refreshed if Delta Table update (INSERT, DELETE, UPDATE, MERGE) is performed IN SAME CLUSTER where DF was cached.")
print("IMPORTANT: All of this is only valid if Cluster Runtime Version is >= 12.2 LTS. Otherwise, Cached DataFrame remains stale.")
print_separator()
Wednesday
Added source code in video comments if needed 😀
Wednesday - last edited Wednesday
@Coffee77 thanks for knowledge sharing!
I see in the YT vid you're using the .persist() method within a notebook, cool stuff! Does this strategy work for users who don't use Pyspark i.e. SQL heavy folk? What would their strategy be? 😎
There's also other variations of cache that get used within databricks. I stumbled across this article last week: https://docs.databricks.com/aws/en/sql/user/queries/query-caching
It's certainly worth a read 🙂.
All the best,
BS
Wednesday
Interesting question about SQL indeed @BS_THE_ANALYST , I haven't dealt with it until now, but I'll take a look at that article, it looks great, thanks. There are a lot of flavours of "caches" and knowing "what", "where" and "how" data is cached and/or invalidated under different scenarios is not always easy to understand.
Wednesday
Related to caching @BS_THE_ANALYST , I didn't want to make this video more complex (only showing advantages of persist() and cache() methods) but when retrieving data from delta tables due to lazy evaluation, it is not 100% correct delta tables are always scanned. If you are running clusters with worker nodes enabled to "disk caching", parquet files will be cached locally once retrieved and then, this will improve performance as well. The easiest way of using "disk caching" is to choose a worker type with SSD volumes when you configure your cluster. This is not always allowed for "develop" clusters xD
Wednesday
@Coffee77 you've earned yourself a subscriber! I'll be following along with your content 🙂↔️🥳.
Thanks for providing the insight about this, I really appreciate it. I'll definitely look more into disk caching now.
All the best,
BS
Wednesday
Your subscription deserves a post and a video 😉 to clarify how to leverage caching stuff with "Databricks Disk Caching" (provided by DBR) and "Spark Caching" (provided by Spark) working together. In a nutshell it is as you can see below:
Both can work as a L1/L2 cache in cluster worker nodes only if cloud VMs have SSDs (Solid Stage Drives). Otherwise, only Apache Spark Caching can be used. I'll explain later in detail, but take a look at picture below to summarize features of both caching types:
I hope this helps!
Wednesday
AFAIK L1/2/3 cache is not controlled by programs but by the cpu. Perhaps you mean something else?
Wednesday - last edited Wednesday
That is only a rough comparison meaning if a given DF is needed by second time, spark caching is tried (first fastest way), otherwise databricks disk caching (second fastest way). Only if both caches are evicted, data from cloud storage is returned again (slowest). This behavior is the important part. Is this ok for you?
Wednesday
It is all ok for me 🙂 I only wanted to point out that the terms L1/L2 cache are used for other cache, not controlled by spark.
Thursday
Thanks for your response @-werners- . Caching is really cool when it is used correctly but not easy to get the full picture quickly in some scenarios, so any clarification to avoid confusion is always welcome 😉
Friday
Source Code with samples available at https://github.com/CafeConData/Spark-Caching-on-Delta-Tables
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now