<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Deleting records from Delta table that are not in relational table in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103370#M41420</link>
    <description>&lt;P&gt;The isin() function in PySpark is inefficient for large datasets. It builds a list of values in memory and performs filtering, which becomes resource-intensive and doesn't leverage distributed computation effectively. Replacing it with JOIN or MERGE operations ensures distributed processing and better performance.&lt;/P&gt;&lt;P&gt;Instead of using isin(), create a table with the records you want to delete and use a MERGE operation with the main Delta table. For smaller datasets, consider broadcasting the smaller table for efficiency. Here’s an example:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from delta.tables import DeltaTable

# Load records to delete into a temporary table
records_to_delete = spark.read.format("jdbc").option("query", "SELECT pk FROM sql_table").load()
records_to_delete.createOrReplaceTempView("delete_records")

# Use Delta's MERGE operation
delta_table = DeltaTable.forName(spark, "delta_table_name")
delta_table.alias("main").merge(
    records_to_delete.alias("delete"),
    "main.pk = delete.pk"
).whenNotMatchedDelete().execute()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;This approach scales better and fully utilizes Spark’s distributed capabilities.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;For more details, refer to the Databricks documentation on &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Delta&lt;/SPAN&gt;&lt;SPAN&gt; Lake&lt;/SPAN&gt;&lt;SPAN&gt; MERGE&lt;/SPAN&gt;&lt;/A&gt; and &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Optimizing&lt;/SPAN&gt;&lt;SPAN&gt; Delta&lt;/SPAN&gt;&lt;SPAN&gt; Tables&lt;/SPAN&gt;&lt;/A&gt;. These resources provide additional guidance on using Delta Lake effectively for large-scale data management.&lt;/P&gt;</description>
    <pubDate>Sat, 28 Dec 2024 01:39:21 GMT</pubDate>
    <dc:creator>fmadeiro</dc:creator>
    <dc:date>2024-12-28T01:39:21Z</dc:date>
    <item>
      <title>Deleting records from Delta table that are not in relational table</title>
      <link>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103369#M41419</link>
      <description>&lt;P&gt;I have a Delta table that I keep in sync with a relational (SQL Server) table. The inserts and updates are easy but checking for records to delete is prohibitively slow. I am querying the relational table for all primary key values and any primary key values that don't exist in the Delta table get deleted from the Delta table. The following job takes about 10 minutes for the 700 million record table.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;pks = spark.read.format("jdbc").option("query": "SELECT pk FROM sql_table_name").load()
delta_table = spark.read.table(delta_table_name)
r = target_table.filter(~col("pk").isin(pks[0]))
display(r)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Is this just something I should expect to take a long time, or is there a meaningfully more efficient way to do this? The Delta table uses liquid clustering, partitioned on lower cardinality columns. I am doing this in a PySpark notebook at the moment.&lt;BR /&gt;Thanks!&lt;/P&gt;</description>
      <pubDate>Fri, 27 Dec 2024 22:40:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103369#M41419</guid>
      <dc:creator>h2p5cq8</dc:creator>
      <dc:date>2024-12-27T22:40:47Z</dc:date>
    </item>
    <item>
      <title>Re: Deleting records from Delta table that are not in relational table</title>
      <link>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103370#M41420</link>
      <description>&lt;P&gt;The isin() function in PySpark is inefficient for large datasets. It builds a list of values in memory and performs filtering, which becomes resource-intensive and doesn't leverage distributed computation effectively. Replacing it with JOIN or MERGE operations ensures distributed processing and better performance.&lt;/P&gt;&lt;P&gt;Instead of using isin(), create a table with the records you want to delete and use a MERGE operation with the main Delta table. For smaller datasets, consider broadcasting the smaller table for efficiency. Here’s an example:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from delta.tables import DeltaTable

# Load records to delete into a temporary table
records_to_delete = spark.read.format("jdbc").option("query", "SELECT pk FROM sql_table").load()
records_to_delete.createOrReplaceTempView("delete_records")

# Use Delta's MERGE operation
delta_table = DeltaTable.forName(spark, "delta_table_name")
delta_table.alias("main").merge(
    records_to_delete.alias("delete"),
    "main.pk = delete.pk"
).whenNotMatchedDelete().execute()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;This approach scales better and fully utilizes Spark’s distributed capabilities.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;For more details, refer to the Databricks documentation on &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Delta&lt;/SPAN&gt;&lt;SPAN&gt; Lake&lt;/SPAN&gt;&lt;SPAN&gt; MERGE&lt;/SPAN&gt;&lt;/A&gt; and &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Optimizing&lt;/SPAN&gt;&lt;SPAN&gt; Delta&lt;/SPAN&gt;&lt;SPAN&gt; Tables&lt;/SPAN&gt;&lt;/A&gt;. These resources provide additional guidance on using Delta Lake effectively for large-scale data management.&lt;/P&gt;</description>
      <pubDate>Sat, 28 Dec 2024 01:39:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103370#M41420</guid>
      <dc:creator>fmadeiro</dc:creator>
      <dc:date>2024-12-28T01:39:21Z</dc:date>
    </item>
    <item>
      <title>Re: Deleting records from Delta table that are not in relational table</title>
      <link>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103375#M41422</link>
      <description>&lt;P&gt;Delta Lake always creates a new version of parquet files whenever any operation is performed. In order to have a better performance,&amp;nbsp;you can Optimize the table which rewrites the parquet files for that table behind the scenes to improve the data layout (Read more about optimize here: &lt;A href="https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-optimize" target="_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-optimize&lt;/A&gt;). It is a simple command: &lt;STRONG&gt;&lt;EM&gt;OPTIMIZE &amp;lt;Table Name&amp;gt;&lt;/EM&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Also, you can run the VACUUM command to clean up old versions of the data&amp;nbsp;and free up storage space: &lt;A href="https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-vacuum" target="_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-vacuum&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Sat, 28 Dec 2024 04:21:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103375#M41422</guid>
      <dc:creator>karthickrs</dc:creator>
      <dc:date>2024-12-28T04:21:09Z</dc:date>
    </item>
    <item>
      <title>Re: Deleting records from Delta table that are not in relational table</title>
      <link>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103434#M41439</link>
      <description>&lt;P&gt;Let's understand the complexity behind this code when executed on delta table along with Spark.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;pks = spark.read.format("jdbc").option("query": "SELECT pk FROM sql_table_name").load()
delta_table = spark.read.table(delta_table_name)
r = target_table.filter(~col("pk").isin(pks[0]))
display(r)&lt;/LI-CODE&gt;&lt;P&gt;&lt;FONT size="3"&gt;&lt;STRONG&gt;Line 1:&lt;/STRONG&gt; You are reading data from OLTP system for all primary keys from given table into dataframe.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;&lt;STRONG&gt;Line 2:&lt;/STRONG&gt; Reading Delta table&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;&lt;STRONG&gt;Line 3:&lt;/STRONG&gt; &lt;FONT color="#FF0000"&gt;This line is actually causing the complexity&lt;/FONT&gt;, where you are preparing a python list from&amp;nbsp;&lt;EM&gt;pks&lt;/EM&gt; dataframe and passing to filter on delta table dataframe. Here the code will try to loop through the rows in&amp;nbsp;&lt;EM&gt;pks&lt;/EM&gt; df to capture all PK values list, the looping will increase the time by consuming both compute and memory.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;U&gt;&lt;FONT size="4"&gt;Solution:&lt;/FONT&gt;&lt;/U&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="4"&gt;You can maintain a delete logs table in SQL instance (OLTP). You use this only to delete records in your delta table. As I am assuming you are not soft deleting your data in SQL table, your row will be gone for good, so you can create a trigger in SQL server to manage a log of deleted record or rows which can be leveraged.&lt;/FONT&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 29 Dec 2024 07:24:05 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/deleting-records-from-delta-table-that-are-not-in-relational/m-p/103434#M41439</guid>
      <dc:creator>hari-prasad</dc:creator>
      <dc:date>2024-12-29T07:24:05Z</dc:date>
    </item>
  </channel>
</rss>

