I have a PySpark streaming pipeline which reads data from a Kafka topic, data undergoes thru various transformations and finally gets merged into a databricks delta table. In the beginning we were loading data into the delta table by using the merge function as given below.
This incoming dataframe inc_df had data for all partitions.
merge into main_db.main_delta_table main_dt USING inc_df df ON
main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND
main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND
main_.rule_read_start=df.rule_read_start AND
main_.company = df.company
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
We were executing the above query on table level.
I have given a very basic diagram of the process in the image below.
But my delta table is partitioned on continent and year. For example, this is how my partitioned delta table looks like.
So I tried implementing the merge on partition level and tried to run merge activity on multiple partitions parallelly. i.e. I have created seperate pipelines with the filters in queries on partition levels. Image can be seen below.
merge into main_db.main_delta_table main_dt USING inc_df df ON
main_dt.continent in ('AFRICA') AND main_dt.year in (‘202301’) AND
main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND
main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND
main_.rule_read_start=df.rule_read_start AND
main_.company = df.company
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
But I am seeing an error with concurrency.
com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [continent=AFRICA, year=2021] by a concurrent update. Please try the operation again.
I understand that the error is telling me that it cannot update files concurrently. But I have huge volume of data in production and I don't want to perform merge on table level where there are almost 1billion records without proper filters.
Trial2: As an alternate approach,
- I saved my incremental dataframe in an S3 bucket (like a staging dir) and end my streaming pipeline there.
- Then I have a seperate PySpark job that reads data from that S3 staging dir and performs merge into my main delta table, once again on partition level (I have specified partitions in those jobs as filters)
But I am facing the same exception/error there as well.
Trial3:
I also made another attempt in a different approach as mentioned in the link and `ConcurrentAppendException` section from that page.
base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
source=final_incremental_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='Africa'")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
and
base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
source=final_incremental_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='ASIA'")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
I ran the above merge operations in two separate pipelines. But I am still facing the same issue.
Could anyone let me know how can I design and optimise my streaming pipeline to merge data into delta table on partition level by having multiple jobs parallelly (jobs running on individual partitions)