01-13-2023 04:26 AM
I have a PySpark streaming pipeline which reads data from a Kafka topic, data undergoes thru various transformations and finally gets merged into a databricks delta table. In the beginning we were loading data into the delta table by using the merge function as given below.
This incoming dataframe inc_df had data for all partitions.
merge into main_db.main_delta_table main_dt USING inc_df df ON
main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND
main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND
main_.rule_read_start=df.rule_read_start AND
main_.company = df.company
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *We were executing the above query on table level.
I have given a very basic diagram of the process in the image below.
But my delta table is partitioned on continent and year. For example, this is how my partitioned delta table looks like.
So I tried implementing the merge on partition level and tried to run merge activity on multiple partitions parallelly. i.e. I have created seperate pipelines with the filters in queries on partition levels. Image can be seen below.
merge into main_db.main_delta_table main_dt USING inc_df df ON
main_dt.continent in ('AFRICA') AND main_dt.year in (‘202301’) AND
main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND
main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND
main_.rule_read_start=df.rule_read_start AND
main_.company = df.company
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *But I am seeing an error with concurrency.
com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [continent=AFRICA, year=2021] by a concurrent update. Please try the operation again.I understand that the error is telling me that it cannot update files concurrently. But I have huge volume of data in production and I don't want to perform merge on table level where there are almost 1billion records without proper filters.
Trial2: As an alternate approach,
But I am facing the same exception/error there as well.
Trial3:
I also made another attempt in a different approach as mentioned in the link and `ConcurrentAppendException` section from that page.
base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
source=final_incremental_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='Africa'")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()and
base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
source=final_incremental_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='ASIA'")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()I ran the above merge operations in two separate pipelines. But I am still facing the same issue.
Could anyone let me know how can I design and optimise my streaming pipeline to merge data into delta table on partition level by having multiple jobs parallelly (jobs running on individual partitions)
04-10-2023 07:41 AM
@bobbysidhartha :
When merging data into a partitioned Delta table in parallel, it is important to ensure that each job only accesses and modifies the files in its own partition to avoid concurrency issues. One way to achieve this is to use partition-level locking to prevent concurrent updates to the same partition.
Here is an example of how you can modify your PySpark streaming pipeline to merge data into a partitioned Delta table in parallel:
Here is an example code snippet to help you get started:
# Get the partition keys from your Delta table
partition_keys = ["continent", "year"]
# Iterate through each partition and create a Spark job to update it
for continent in ["AFRICA", "ASIA", "EUROPE", ...]:
for year in ["2021", "2022", "2023", ...]:
# Create a filter for the current partition
partition_filter = f"continent='{continent}' AND year='{year}'"
# Create a Delta table object for the current partition
delta_table = DeltaTable.forPath(spark, f"/path/to/delta_table/{continent}/{year}")
# Acquire a lock on the current partition
delta_table \
.toDF() \
.where(partition_filter) \
.write \
.option("partitionBy", ",".join(partition_keys)) \
.option("replaceWhere", partition_filter) \
.format("delta") \
.mode("append") \
.save(delta_table._data_path)
# Perform the merge operation on the current partition
delta_table \
.alias("main_dt") \
.merge(
source=inc_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Release the lock on the current partition
delta_table \
.toDF() \
.where(partition_filter) \
.write \
.option("partitionBy", ",".join(partition_keys)) \
.option("replaceWhere", "1=1") \
.format("delta") \
.mode("append") \
.save(delta_table._data_path)Note that the above code is just an example and may need to be modified to fit your specific use case. Also, be sure to test this approach on a small subset of your data before running it on your entire dataset to ensure that it works as expected.
12-23-2024 04:53 AM - edited 12-23-2024 04:54 AM
This reply seems to have been AI-generated. I would not approve this one.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now