2 weeks ago
I have a Delta table that I keep in sync with a relational (SQL Server) table. The inserts and updates are easy but checking for records to delete is prohibitively slow. I am querying the relational table for all primary key values and any primary key values that don't exist in the Delta table get deleted from the Delta table. The following job takes about 10 minutes for the 700 million record table.
pks = spark.read.format("jdbc").option("query": "SELECT pk FROM sql_table_name").load()
delta_table = spark.read.table(delta_table_name)
r = target_table.filter(~col("pk").isin(pks[0]))
display(r)
Is this just something I should expect to take a long time, or is there a meaningfully more efficient way to do this? The Delta table uses liquid clustering, partitioned on lower cardinality columns. I am doing this in a PySpark notebook at the moment.
Thanks!
2 weeks ago
The isin() function in PySpark is inefficient for large datasets. It builds a list of values in memory and performs filtering, which becomes resource-intensive and doesn't leverage distributed computation effectively. Replacing it with JOIN or MERGE operations ensures distributed processing and better performance.
Instead of using isin(), create a table with the records you want to delete and use a MERGE operation with the main Delta table. For smaller datasets, consider broadcasting the smaller table for efficiency. Here’s an example:
from delta.tables import DeltaTable
# Load records to delete into a temporary table
records_to_delete = spark.read.format("jdbc").option("query", "SELECT pk FROM sql_table").load()
records_to_delete.createOrReplaceTempView("delete_records")
# Use Delta's MERGE operation
delta_table = DeltaTable.forName(spark, "delta_table_name")
delta_table.alias("main").merge(
records_to_delete.alias("delete"),
"main.pk = delete.pk"
).whenNotMatchedDelete().execute()
For more details, refer to the Databricks documentation on Delta Lake MERGE and Optimizing Delta Tables. These resources provide additional guidance on using Delta Lake effectively for large-scale data management.
2 weeks ago
The isin() function in PySpark is inefficient for large datasets. It builds a list of values in memory and performs filtering, which becomes resource-intensive and doesn't leverage distributed computation effectively. Replacing it with JOIN or MERGE operations ensures distributed processing and better performance.
Instead of using isin(), create a table with the records you want to delete and use a MERGE operation with the main Delta table. For smaller datasets, consider broadcasting the smaller table for efficiency. Here’s an example:
from delta.tables import DeltaTable
# Load records to delete into a temporary table
records_to_delete = spark.read.format("jdbc").option("query", "SELECT pk FROM sql_table").load()
records_to_delete.createOrReplaceTempView("delete_records")
# Use Delta's MERGE operation
delta_table = DeltaTable.forName(spark, "delta_table_name")
delta_table.alias("main").merge(
records_to_delete.alias("delete"),
"main.pk = delete.pk"
).whenNotMatchedDelete().execute()
For more details, refer to the Databricks documentation on Delta Lake MERGE and Optimizing Delta Tables. These resources provide additional guidance on using Delta Lake effectively for large-scale data management.
2 weeks ago
Delta Lake always creates a new version of parquet files whenever any operation is performed. In order to have a better performance, you can Optimize the table which rewrites the parquet files for that table behind the scenes to improve the data layout (Read more about optimize here: https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-optimize). It is a simple command: OPTIMIZE <Table Name>
Also, you can run the VACUUM command to clean up old versions of the data and free up storage space: https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-vacuum
a week ago
Let's understand the complexity behind this code when executed on delta table along with Spark.
pks = spark.read.format("jdbc").option("query": "SELECT pk FROM sql_table_name").load()
delta_table = spark.read.table(delta_table_name)
r = target_table.filter(~col("pk").isin(pks[0]))
display(r)
Line 1: You are reading data from OLTP system for all primary keys from given table into dataframe.
Line 2: Reading Delta table
Line 3: This line is actually causing the complexity, where you are preparing a python list from pks dataframe and passing to filter on delta table dataframe. Here the code will try to loop through the rows in pks df to capture all PK values list, the looping will increase the time by consuming both compute and memory.
Solution:
You can maintain a delete logs table in SQL instance (OLTP). You use this only to delete records in your delta table. As I am assuming you are not soft deleting your data in SQL table, your row will be gone for good, so you can create a trigger in SQL server to manage a log of deleted record or rows which can be leveraged.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group