02-27-2023 12:29 AM
I have a delta table that is partitioned by Year, Date and month. I'm trying to merge data to this on all three partition columns + an extra column (an ID). My merge statement is below:
MERGE INTO delta.<path of delta table> oldData
using df newData
on oldData.year = '2023' and oldData.month = '10' and oldData.day = '12' and oldData.clientid=newData.clientid WHEN MATCHED THEN DELETE
When I do an explain on this query I get this plan:
== Physical Plan ==
Execute MergeIntoCommandEdge
+- MergeIntoCommandEdge SubqueryAlias newData, SubqueryAlias oldData, Delta[version=4, ... , (((year#3761220 = 2023) AND (month#3761221 = 10)) AND ((day#3761222 = 12) AND (clientid#3761212 = clientid#3532751)))
This query runs for a long time considering I think the data I'm trying to process is small (less than 1M). Also based on this link I should see some sort of mention of partition in the physical plan but I don't. Any ideas why it seems that my merge statement is not using the partition when executing the merge statement?
03-13-2023 05:00 AM
@Joceph Moreno : Based on the information you have provided, it seems like you are specifying the partition keys in the ON condition of the merge statement. However, you should be using the partition columns in the USING clause of the merge statement instead.
Csn you check if the below works for you?
MERGE INTO delta.<path of delta table> oldData
USING (
SELECT * FROM df WHERE year = '2023' AND month = '10' AND day = '12'
) newData
ON oldData.clientid = newData.clientid
WHEN MATCHED THEN DELETE;
In this example, the partition columns (year, month, day) are used in the subquery in the USING clause to filter the data being merged. The partition columns are not included in the ON condition, as they are already being used to filter the data. Instead, the clientid column is used in the ON condition to match records between the old and new data. With this approach, the merge operation should only apply to the partition containing the data that matches the filter condition in the USING clause, which should improve performance.
Please let us know if this helps.
03-13-2023 05:00 AM
@Joceph Moreno : Based on the information you have provided, it seems like you are specifying the partition keys in the ON condition of the merge statement. However, you should be using the partition columns in the USING clause of the merge statement instead.
Csn you check if the below works for you?
MERGE INTO delta.<path of delta table> oldData
USING (
SELECT * FROM df WHERE year = '2023' AND month = '10' AND day = '12'
) newData
ON oldData.clientid = newData.clientid
WHEN MATCHED THEN DELETE;
In this example, the partition columns (year, month, day) are used in the subquery in the USING clause to filter the data being merged. The partition columns are not included in the ON condition, as they are already being used to filter the data. Instead, the clientid column is used in the ON condition to match records between the old and new data. With this approach, the merge operation should only apply to the partition containing the data that matches the filter condition in the USING clause, which should improve performance.
Please let us know if this helps.
Thursday
How would someone trigger this using pyspark and the python delta interface?
03-30-2023 01:24 PM
Isn't the suggested idea only filtering the input dataframe (resulting in a smaller amount of data to match across the whole delta table) rather than prune the delta table for relevant partitions to scan?
Thursday - last edited Thursday
Agree, the suggested idea filters the input DataFrame but does not directly trigger partition pruning on the Delta table.
@ghofigjong Partition pruning occurs when the query explicitly applies filters on the partition columns (year, month, day) to the Delta table (oldData). To achieve this, it needs to be ensured that the filter is part of the Delta table's scan node, not just the input DataFrame. For example, include the filter directly in the Delta table query, like:
MERGE INTO delta.<path of delta table> oldData
USING (
SELECT * FROM delta.<path of delta table>
WHERE year = '2023' AND month = '10' AND day = '12'
) filteredOldData
ON filteredOldData.clientid = newData.clientid
WHEN MATCHED THEN DELETE;
And then use EXPLAIN to confirm if the physical plan mentions partition pruning explicitly for oldData. So you should see partition filters like year = 2023 AND month = 10 AND day = 12 in the Delta Table scan node.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group