cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Dynamic partition overwrite with Streaming Data

Jorge3
New Contributor III

Hi,

I'm working on a job that propagate updates of data from a delta table to a parquet files (requirement of the consumer). The data is partitioned by day (year > month > day) and the daily data is updated every hour. I'm using table read streaming with checkpoint to load new data incrementally (appends and updates). The issue is that every time the new data is loaded, the job creates a new .parquet file instead of overwrite the old one.

The dynamic partition overwrite mode does exactly this, but I have tried and it didn't work with the writeStream method.

 

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table").format("parquet").partitionBy("year", "month", "day").save("/path")

 

I want to implement something similar to the dynamic partition overwrite but for incremental streaming data. So, new day data will create a new partition/file, and the following updates overwrites the last partition.

Does anyone know of a way to use this mode with streaming data?

Thanks in advance,
Jorge

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @Jorge3 You’re on the right track with dynamic partition overwrite mode.

However, when dealing with streaming data, there are some nuances to consider.

  1. Dynamic Partition Overwrite Mode:

    • Databricks Runtime 11.1 and above supports dynamic partition overwrite mode for partitioned tables.
    • In this mode, operations overwrite all existing data in each logical partition for which the write commits new data.
    • For tables with multiple partitions, Databricks Runtime 12.0 and below only support dynamic partition overwrites if all partition columns are of the same data type.
    • To enable dynamic partition overwrite mode, set the Spark session configuration spark.sql.sources.partitionOverwriteMode to "dynamic".
    • You can also enable this by setting the DataFrameWriter option partitionOverwriteMode to "dynamic". If present, the query-specific option overrides the mode defined in the session configuration1.
  2. Streaming Data and Dynamic Partition Overwrite:

    • Unfortunately, dynamic partition overwrite mode is not directly supported with streaming data using the writeStream method.
    • The writeStream method currently does not have an option to specify partition overwrite mode.
    • However, you can achieve similar behavior by manually managing the partitioning logic in your streaming job.
  3. Workaround:

    • Instead of using writeStream, consider using the foreachBatch method in your streaming query.

    • Inside the foreachBatch function, you can explicitly handle the partitioning and overwrite logic.

    • For example, you can read the incoming streaming data, determine the partition keys (e.g., year, month, day), and then overwrite the existing data in the corresponding partition.

    • Here’s a high-level example (pseudo-code) to illustrate the concept:

      stream_data.writeStream
          .foreachBatch { (batchDF, batchId) =>
              // Extract partition keys (year, month, day) from batchDF
              val year = ...
              val month = ...
              val day = ...
      
              // Construct the target path based on partition keys
              val targetPath = s"/path/year=$year/month=$month/day=$day"
      
              // Overwrite existing data in the target partition
              batchDF.write.mode("overwrite").parquet(targetPath)
          }
          .start()
      
    • In this example, batchDF represents the streaming data for a specific batch, and you construct the target path dynamically based on the partition keys.

  4. Checkpointing:

    • Since you’re already using checkpointing, ensure that your checkpoint directory is set correctly to maintain the streaming state across restarts.

Jorge3
New Contributor III

Thanks for your answer @Kaniz. I have tried what you suggested with the following code in PySpark. I also add some lines to handle scenarios where receiving multiple files/partitions: for example in the first ingestion where I'm reading historical data:

output_location = 'path'

def upsert_to_parquet(batchDF, batchId):
    # Extract partition keys (year, month, day) from batchDF
    # In case more than one partition is updated iterate over them
    partitions = batchDF.select('year', 'month', 'day').distinct().collect()
    for partition in partitions:
        year, month, day = partition['year'], partition['month'], partition['day']

        # Construct the target path based on partition keys
        partition_path = f'{output_location}data/year={year}/month={month}/day={day}'

        # Overwrite existing data in the target partition
        batchDF.filter((col('year') == year) & (col('month') == month) & (col('day') == day)).write.mode('overwrite').parquet(partition_path)


df.writeStream \
    .foreachBatch(upsert_to_parquet) \
    .option('checkpointLocation', output_location + 'checkpoint') \
    .trigger(availableNow = True) \
    .start()

Apparently it solves the duplication on updates. However there are some additional issues:

  • Some extra files appear on my directory on every partition with logs: _committed_xxx, _started_xxx, _SUCCESS files
  • For each partition I got two .parquet files: part-0000-xxx.snappy.parquet, part-0001-xxx.snappy.parquet. The first one is empty, with only the schema information (column names). And the second one contains all the data

Ideally only one parquet file should be created. Do you know why this happen?

 

 

 

 

Hubert-Dudek
Esteemed Contributor III

Why not migrate to Delta and just use MERGE inside forEachBatch?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.