cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Slow stream static join in Spark Structured Streaming

EDDatabricks
Contributor

Situation

Records are streamed from an input Delta table via a Spark Structured Streaming job. The streaming job performs the following.

  1. Read from input Delta table (readStream)
  2. Static join on small JSON
  3. Static join on big Delta table
  4. Write to three Delta tables using foreachbatch logic

Problem

Step 3 is extremely slow. It takes more than 15 minutes to process a single batch of data using a job compute cluster with 2 Standard_DS3_v2 workers. Moreover, after 2-4 hours the job fails with an Out Of Memory exception. Looking at the metrics tab of the cluster, we notice a data spill to disk happening. The screenshot below shows the data spill.

EDDatabricks_1-1703760391974.png

Code snippets

The code snippet below shows step 3; the static join on the big Delta table. In essence, the big Delta table is loaded, de-duplicated and joined to the streaming records. Every time a batch is processed, the big Delta table is re-read, de-duplicated and joined to the batch of streaming records.

# Load big Delta table
big_delta_table = (
    spark.read
    .format('delta')
    .table('big_delta_table ')
)

# De-duplicate big Delta table
c1_id_window = Window.partitionBy('c1').orderBy(F.col('updatedOn').desc())
c1_data = (
    big_delta_table 
    .filter(F.col('c1').isNotNull())
    .withColumn(
        'row_num',
        F.row_number().over(ir_id_window)
    )
    .filter(F.col('row_num') == 1)
    .drop('row_num')
)

# De-duplicate big Delta table
call_sign_id_window = Window.partitionBy('c2').orderBy(F.col('updatedOn').desc())
c2_data = (
    ovr_data
    .filter(F.col('c2').isNotNull() &  F.col('c1').isNull())
    .withColumn(
        'row_num',
        F.row_number().over(call_sign_id_window)
    )
    .filter(F.col('row_num') == 1)
    .drop('row_num')
)

clean_big_delta_table = c1_data.union(c2_data )

# Join streaming records with de-duplicated big Delta table
joined_records = (
    streaming_records
    .join(
        F.broadcast(clean_big_delta_table ),
        join_condition,
        'left'
    )
)

 The following code snippet shows step 4; the foreachbatch logic. The batch is persisted to avoid reprocessing the same data 3 times and then the data is written to 3 distinct sinks. Lastly, the batch is unpersisted.

def write_gold_tables(input_df: DataFrame, batch_id: str):
    input_df.persist()

    # Write to Delta table sink 1

    # Write to Delta table sink 2

    # Write to Delta table sink 3

    input_df.unpersist(blocking=True)

 Questions

  • Is there a way to optimize the code provided above, to increase the speed of the streaming job?
  • How could we avoid the data spill to disk?
  • What is the root cause of the OOM exception?
1 REPLY 1

Wojciech_BUK
Valued Contributor III

You have quite small machines that you are using, please take into consideration that a lot of memory of machine is occupied by other processes 

https://kb.databricks.com/clusters/spark-shows-less-memory

This is not good idea to broadcast huge data frames as it can lead to OOM exception you are getting as spark worker will not be able to handle both batch coming from stream and big DF output 3rd DF that is produced by join.

The easiest ways would be to:

- exclude big dataframe transformation ( deduplicatnion ) to separate process and read already cleaned data. Do you need always entire dataframe to be cached ?

- try to controll your streaming batch size so it can fit memory with other cached data frames, there are options to do that 

- make sure your data is partitioned and distributed for paralele execution 

- make good cluster sizing , you pay per hour, if you use bigger cluster and it finish much faster , you pay less. If there are spills to disk, it gets slower and you pay more. Calculate your dataframes and batch size and parallelism ( psrtitnions) and adjust cluster memory, cores and numer of workers .

 

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group