cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Partition optimization strategy for task that massively inflate size of dataframe

Steffen
New Contributor III

Hello

we are facing some optimization problems for a workflow that interpolates raw measurements to one second intervals.

We are currently using dbl tempo for this, but had the same issues when doing an simple approach with window function.

we have the following input dataframe:

  • columns: id, timestamp, value
  • around 300.000 unique ids
  • some ids have high frequency data (e.g. every few seconds one new value)
  • some ids have only one or entry per day / week

    We need to interpolate all measurements to one second intervals. We want to run this for one week of data.

    our current approach is:

    1. read raw measurements from table
    2. create dummy start_ts for all signals at the beginning of the timespan to interpolate with the latest value of each signal at the time
    3. create dummy end_ts at the end of the timespan to interpolate with a null value

interpolate the df based like 

tsdf_aligned = tsdf_measurements.interpolate(
        freq=f"1 seconds",
        func="ceil",
        method="ffill",
        partition_cols=["id"],
        perform_checks=False,
)

As spark creates the partitions for the input df based on the raw data and this contains both high frequency data and very low frequency data, the input df has partitions that contain very different amount of measurements per signal:

  •  one partition may only contain only a few signals that have high frequency data 

another partition may contain the data of 1000 of signals that only have a few measurements in the timespan

this results in data skew in the interpolation step as the partitions with a measurements of low frequency data suddenly will be interpolated to have 1 entry per second per signal.

For example if we don't configure any spark conf, the input df for one week of data will have 225 paritions:

image.png

We observed huge spilling and a bad distribution.

If we force spark to have smaller partitions, we can reduce the spill a bit but still have huge skew in the data. E.G. we set

spark.conf.set("spark.sql.files.maxPartitionBytes", "16m") spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "16m")
 
and end up with 1100 partitions in the interpolation step:
image.png

but the data is still heavily skewed.

We tried to use call repartitionByRange(4000, "id") on the df to create a more evenly distributed input df but this will create some partitions where 1000 of low frequent sending signals are grouped together and some partitions where only a few signals are grouped together (as spark tries to make evenly sized partitions).

We are running out of ideas how to prepare the input df in a way to have well defined partitions. Are we missing something or are we using a completely wrong approach?

1 REPLY 1

balajij8
Contributor

 

Hi, Key Points below

  • Time Window Chunking - Avoid interpolating a full week data in a single Spark action. Split the workload into daily or 12 hour slices. This caps maximum memory pressure, enables parallel execution and simplifies failure recovery.
  • Before interpolation, compute how many output rows each id will generate in the target window. This is derived from the duration between the earliest and latest measurement for each signal within the chunk.
  • Create Synthetic Workload Partition Key - Group signals into buckets based on their expected post interpolation row count. Target roughly equal memory per bucket (150 MB of expected interpolated data per partition).
  • Repartition by Workload Bucket - Apply a repartition operation using the synthetic bucket key. This ensures each Spark executor handles a comparable interpolation workload after expansion eliminating the severe skew that causes disk spill and straggler tasks.
  • Keep Adaptive Query Execution enabled with advisory partition sizing tuned to your bucket target. Photon will accelerate sequence generation and sort operations without manual tuning.
  • Persist the interpolated output with Delta Lake using clustering on id, timestamp. Liquid clustering optimizes downstream query pruning.
  • Evaluate whether all signals require 1 second granularity. Apply a tiered interpolation strategy: critical signals at 1s and low-variability signals at 60s.
  • Track partition distribution, spill volume and task duration metrics to iteratively adjust bucket sizing