โ03-15-2024 01:59 AM
Hi,
I'm working on a job that propagate updates of data from a delta table to a parquet files (requirement of the consumer). The data is partitioned by day (year > month > day) and the daily data is updated every hour. I'm using table read streaming with checkpoint to load new data incrementally (appends and updates). The issue is that every time the new data is loaded, the job creates a new .parquet file instead of overwrite the old one.
The dynamic partition overwrite mode does exactly this, but I have tried and it didn't work with the writeStream method.
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table").format("parquet").partitionBy("year", "month", "day").save("/path")
I want to implement something similar to the dynamic partition overwrite but for incremental streaming data. So, new day data will create a new partition/file, and the following updates overwrites the last partition.
Does anyone know of a way to use this mode with streaming data?
Thanks in advance,
Jorge
โ03-15-2024 02:15 AM
Hi @Jorge3, Youโre on the right track with dynamic partition overwrite mode.
However, when dealing with streaming data, there are some nuances to consider.
Dynamic Partition Overwrite Mode:
spark.sql.sources.partitionOverwriteMode
to "dynamic"
.partitionOverwriteMode
to "dynamic"
. If present, the query-specific option overrides the mode defined in the session configuration1.Streaming Data and Dynamic Partition Overwrite:
writeStream
method.writeStream
method currently does not have an option to specify partition overwrite mode.Workaround:
Instead of using writeStream
, consider using the foreachBatch
method in your streaming query.
Inside the foreachBatch
function, you can explicitly handle the partitioning and overwrite logic.
For example, you can read the incoming streaming data, determine the partition keys (e.g., year, month, day), and then overwrite the existing data in the corresponding partition.
Hereโs a high-level example (pseudo-code) to illustrate the concept:
stream_data.writeStream
.foreachBatch { (batchDF, batchId) =>
// Extract partition keys (year, month, day) from batchDF
val year = ...
val month = ...
val day = ...
// Construct the target path based on partition keys
val targetPath = s"/path/year=$year/month=$month/day=$day"
// Overwrite existing data in the target partition
batchDF.write.mode("overwrite").parquet(targetPath)
}
.start()
In this example, batchDF
represents the streaming data for a specific batch, and you construct the target path dynamically based on the partition keys.
Checkpointing:
โ03-16-2024 10:55 AM
Thanks for your answer @Kaniz_Fatma. I have tried what you suggested with the following code in PySpark. I also add some lines to handle scenarios where receiving multiple files/partitions: for example in the first ingestion where I'm reading historical data:
output_location = 'path'
def upsert_to_parquet(batchDF, batchId):
# Extract partition keys (year, month, day) from batchDF
# In case more than one partition is updated iterate over them
partitions = batchDF.select('year', 'month', 'day').distinct().collect()
for partition in partitions:
year, month, day = partition['year'], partition['month'], partition['day']
# Construct the target path based on partition keys
partition_path = f'{output_location}data/year={year}/month={month}/day={day}'
# Overwrite existing data in the target partition
batchDF.filter((col('year') == year) & (col('month') == month) & (col('day') == day)).write.mode('overwrite').parquet(partition_path)
df.writeStream \
.foreachBatch(upsert_to_parquet) \
.option('checkpointLocation', output_location + 'checkpoint') \
.trigger(availableNow = True) \
.start()
Apparently it solves the duplication on updates. However there are some additional issues:
Ideally only one parquet file should be created. Do you know why this happen?
โ03-16-2024 12:11 PM
Why not migrate to Delta and just use MERGE inside forEachBatch?
3 weeks ago - last edited 3 weeks ago
We had a similar situation, @Hubert-Dudek we are using delta, but we are having some problems when propagating updates via merge, as you cannot read the resulting table as streaming source anymore... so using complete overwrite over parquet partitions might be a good idea if you have to read the table as a streaming source.
This can give you best of both worlds, the resulting parquet/delta will not have duplicated entries for each update (similar to delta's merge), so if you read as a static dataframe it will be clean. Also you to read "updated" rows using streaming, alas being much more inneficient than update or merge.
However I do not understand how @Kaniz_Fatma response manages to ensure that the data in the batch corresponds with a particular partition and is not scattered among several partitions.
Excited to expand your horizons with us? Click here to Register and begin your journey to success!
Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!