Is there any way to overwrite a partition in delta table without specifying each and every partition in replace where? For non dated partitions, this is really a mess with delta tables.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-20-2022 03:37 AM
Is there any way to overwrite a partition in delta table without specifying each and every partition in replace where. For non dated partitions, this is really a mess with delta tables.
Most of my DE teams don't want to adopt delta because of these glitches.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-21-2022 04:05 AM
@Saurabh Verma , Delta is ACID and has a commit log, schema, etc., that's why it is different. So when you delete all data related to the partition, it will stay there as it still has a history. INSERT OVERWRITE can overwrite data only in a specified partition (but you need to specify it). The best way is to use MERGE when you have a unique id and partitions in the ON clause.
INSERT OVERWRITE TABLE part PARTITION (p1 = 'part1') VALUES(3)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-06-2022 05:57 AM
Hi @Saurabh Verma following up did you get a chance to check @Hubert Dudek previous comments ?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-06-2022 07:33 AM
@Chetan Kardekar :
Hubert's answer is not relevant in this case. His answer might be correct if someone is performing data warehousing scenario's having primary keys etc. and completely using SQL. Programatic API code doesn't usually use the MERGE statements.
I have seen an open PULL request for this functionality and a lot of people are asking the same but not sure why engineering team is hesitating to implement it. I have written a hacky code to perform the relevant operations where the returned list of write partitions can be directly fed into the spark.write.format("delta").mode("overwrite").option("replaceWhere", writepar).save() statement.
def _get_partitions(spark: SparkSession, df, partition_col_list):
"""
Create a list of partitions in the input dataframe, based on partition columns.
:param spark: The spark session.
:param df: input dataframe.
:param partition_col_list: the list of partition column in input dataframe.
:return: List of partitions in the input dataframe
"""
for i in df.dtypes:
if i[1] in ('date', 'timestamp'):
df = df.withColumn(i[0], F.col(i[0]).cast(StringType()))
write_par_temp = df.select(partition_col_list).distinct().rdd.collect()
write_par_temp2 = str([str(el).replace("Row", "").replace("(", "").replace(")", "").replace(",", " AND") for el in
write_par_temp]).replace('[', "").replace(']', "").replace('"', '(').replace('(,', ') OR ')
len_par = len(write_par_temp2)
write_par = write_par_temp2[:len_par - 1] + ')'
print("write_par: {}".format(write_par))
return write_par

