cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Dynamic partition overwrite with Streaming Data

Jorge3
New Contributor III

Hi,

I'm working on a job that propagate updates of data from a delta table to a parquet files (requirement of the consumer). The data is partitioned by day (year > month > day) and the daily data is updated every hour. I'm using table read streaming with checkpoint to load new data incrementally (appends and updates). The issue is that every time the new data is loaded, the job creates a new .parquet file instead of overwrite the old one.

The dynamic partition overwrite mode does exactly this, but I have tried and it didn't work with the writeStream method.

 

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table").format("parquet").partitionBy("year", "month", "day").save("/path")

 

I want to implement something similar to the dynamic partition overwrite but for incremental streaming data. So, new day data will create a new partition/file, and the following updates overwrites the last partition.

Does anyone know of a way to use this mode with streaming data?

Thanks in advance,
Jorge

3 REPLIES 3

Jorge3
New Contributor III

Thanks for your answer @Retired_mod. I have tried what you suggested with the following code in PySpark. I also add some lines to handle scenarios where receiving multiple files/partitions: for example in the first ingestion where I'm reading historical data:

output_location = 'path'

def upsert_to_parquet(batchDF, batchId):
    # Extract partition keys (year, month, day) from batchDF
    # In case more than one partition is updated iterate over them
    partitions = batchDF.select('year', 'month', 'day').distinct().collect()
    for partition in partitions:
        year, month, day = partition['year'], partition['month'], partition['day']

        # Construct the target path based on partition keys
        partition_path = f'{output_location}data/year={year}/month={month}/day={day}'

        # Overwrite existing data in the target partition
        batchDF.filter((col('year') == year) & (col('month') == month) & (col('day') == day)).write.mode('overwrite').parquet(partition_path)


df.writeStream \
    .foreachBatch(upsert_to_parquet) \
    .option('checkpointLocation', output_location + 'checkpoint') \
    .trigger(availableNow = True) \
    .start()

Apparently it solves the duplication on updates. However there are some additional issues:

  • Some extra files appear on my directory on every partition with logs: _committed_xxx, _started_xxx, _SUCCESS files
  • For each partition I got two .parquet files: part-0000-xxx.snappy.parquet, part-0001-xxx.snappy.parquet. The first one is empty, with only the schema information (column names). And the second one contains all the data

Ideally only one parquet file should be created. Do you know why this happen?

 

 

 

 

Hubert-Dudek
Esteemed Contributor III

Why not migrate to Delta and just use MERGE inside forEachBatch?

JacintoArias
New Contributor III

We had a similar situation, @Hubert-Dudek we are using delta, but we are having some problems when propagating updates via merge, as you cannot read the resulting table as streaming source anymore... so using complete overwrite over parquet partitions might be a good idea if you have to read the table as a streaming source.

This can give you best of both worlds, the resulting parquet/delta will not have duplicated entries for each update (similar to delta's merge), so if you read as a static dataframe it will be clean. Also you to read "updated" rows using streaming, alas being much more inneficient than update or merge.

However I do not understand how @Retired_mod response manages to ensure that the data in the batch corresponds with a particular partition and is not scattered among several partitions.