cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Reading two big tables within each forEachBatch processing method

mjedy7
New Contributor II

I am reading changes from the cdf with availableOnce=True, processing data from checkpoint to checkpoint. During each batch, I perform transformations, but I also need to read two large tables and one small table. Does Spark read these tables from scratch for each batch, or caching internally? 

 

# Read
df = self.spark.readStream.format("delta").option("readChangeFeed", "true")

# Transform
def transform(batch_df):
   # Read delta table 230 GB
   big_side_df = spark.read.format("delta")...
   # Read small df 

   # Do aggregations
   batchdf.join(broadcast(small))
   batchdf.join(bigside_df)....
   return final_df

def process_each_batch(df, _):
   transformed df = transform(df)
   
   # Do Upsert
   df.upsert()


self.df.writeStream.format(DeltaUtils.FORMAT_DELTA)
            .option("checkpointLocation", self.checkpoint_loc)
            .foreachBatch(self.process_each_batch)
            .outputMode("update")
            .trigger(availableNow=True)
            .start()

 

Also I am pretty new and I want to optimize the jobs but I am not sure where to focus on.

First things to come my minds are

  • Adding maxBytesPerTrigger
  • Reading from outside of the each batch and cache?
  • Doing aggregations and prepare big df's to be consumed outside of the job(with another job in the workflow)

Any suggestion would be appreciated.

 

1 REPLY 1

radothede
Contributor II

Hi @mjedy7 

for cacheing in this scenario You could try to levarage persist() and unpersist() for the big table/ spark dataframe, see here:

https://medium.com/@eloutmadiabderrahim/persist-vs-unpersist-in-spark-485694f72452

Try to reduce the amount of data in big spark df You will cache, by reading only the neccessary columns, filtering data (if possible), precompute etc. Run vacuum and optimize on Your table regurarly, consider zordering the data to help spark skipping/ pruning the data aswell.

Broadcasting small table might be good idea.

Setting maxBytesPerTrigger/ maxFilesPerTrigger is for sure good idea.

Make sure your upsert is performing well.

Running the job please use the Spark UI to validate performance:

- monitor usage of the %CPU for each node, make sure Your job utilize all cpu evenly,

- check whats the number of tasks processing during the job execution - if there is a need to repartition/coalesce your input data or use aqe

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group