โ03-15-2024 01:59 AM
Hi,
I'm working on a job that propagate updates of data from a delta table to a parquet files (requirement of the consumer). The data is partitioned by day (year > month > day) and the daily data is updated every hour. I'm using table read streaming with checkpoint to load new data incrementally (appends and updates). The issue is that every time the new data is loaded, the job creates a new .parquet file instead of overwrite the old one.
The dynamic partition overwrite mode does exactly this, but I have tried and it didn't work with the writeStream method.
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table").format("parquet").partitionBy("year", "month", "day").save("/path")
I want to implement something similar to the dynamic partition overwrite but for incremental streaming data. So, new day data will create a new partition/file, and the following updates overwrites the last partition.
Does anyone know of a way to use this mode with streaming data?
Thanks in advance,
Jorge
โ03-16-2024 10:55 AM
Thanks for your answer @Retired_mod. I have tried what you suggested with the following code in PySpark. I also add some lines to handle scenarios where receiving multiple files/partitions: for example in the first ingestion where I'm reading historical data:
output_location = 'path'
def upsert_to_parquet(batchDF, batchId):
# Extract partition keys (year, month, day) from batchDF
# In case more than one partition is updated iterate over them
partitions = batchDF.select('year', 'month', 'day').distinct().collect()
for partition in partitions:
year, month, day = partition['year'], partition['month'], partition['day']
# Construct the target path based on partition keys
partition_path = f'{output_location}data/year={year}/month={month}/day={day}'
# Overwrite existing data in the target partition
batchDF.filter((col('year') == year) & (col('month') == month) & (col('day') == day)).write.mode('overwrite').parquet(partition_path)
df.writeStream \
.foreachBatch(upsert_to_parquet) \
.option('checkpointLocation', output_location + 'checkpoint') \
.trigger(availableNow = True) \
.start()
Apparently it solves the duplication on updates. However there are some additional issues:
Ideally only one parquet file should be created. Do you know why this happen?
โ03-16-2024 12:11 PM
Why not migrate to Delta and just use MERGE inside forEachBatch?
โ06-12-2024 08:47 AM - edited โ06-12-2024 08:56 AM
We had a similar situation, @Hubert-Dudek we are using delta, but we are having some problems when propagating updates via merge, as you cannot read the resulting table as streaming source anymore... so using complete overwrite over parquet partitions might be a good idea if you have to read the table as a streaming source.
This can give you best of both worlds, the resulting parquet/delta will not have duplicated entries for each update (similar to delta's merge), so if you read as a static dataframe it will be clean. Also you to read "updated" rows using streaming, alas being much more inneficient than update or merge.
However I do not understand how @Retired_mod response manages to ensure that the data in the batch corresponds with a particular partition and is not scattered among several partitions.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group