Reading two big tables within each forEachBatch processing method
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2024 11:44 PM - edited 11-24-2024 11:45 PM
I am reading changes from the cdf with availableOnce=True, processing data from checkpoint to checkpoint. During each batch, I perform transformations, but I also need to read two large tables and one small table. Does Spark read these tables from scratch for each batch, or caching internally?
# Read
df = self.spark.readStream.format("delta").option("readChangeFeed", "true")
# Transform
def transform(batch_df):
# Read delta table 230 GB
big_side_df = spark.read.format("delta")...
# Read small df
# Do aggregations
batchdf.join(broadcast(small))
batchdf.join(bigside_df)....
return final_df
def process_each_batch(df, _):
transformed df = transform(df)
# Do Upsert
df.upsert()
self.df.writeStream.format(DeltaUtils.FORMAT_DELTA)
.option("checkpointLocation", self.checkpoint_loc)
.foreachBatch(self.process_each_batch)
.outputMode("update")
.trigger(availableNow=True)
.start()
Also I am pretty new and I want to optimize the jobs but I am not sure where to focus on.
First things to come my minds are
- Adding maxBytesPerTrigger
- Reading from outside of the each batch and cache?
- Doing aggregations and prepare big df's to be consumed outside of the job(with another job in the workflow)
Any suggestion would be appreciated.
- Labels:
-
Spark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-25-2024 01:05 PM
Hi @mjedy7
for cacheing in this scenario You could try to levarage persist() and unpersist() for the big table/ spark dataframe, see here:
https://medium.com/@eloutmadiabderrahim/persist-vs-unpersist-in-spark-485694f72452
Try to reduce the amount of data in big spark df You will cache, by reading only the neccessary columns, filtering data (if possible), precompute etc. Run vacuum and optimize on Your table regurarly, consider zordering the data to help spark skipping/ pruning the data aswell.
Broadcasting small table might be good idea.
Setting maxBytesPerTrigger/ maxFilesPerTrigger is for sure good idea.
Make sure your upsert is performing well.
Running the job please use the Spark UI to validate performance:
- monitor usage of the %CPU for each node, make sure Your job utilize all cpu evenly,
- check whats the number of tasks processing during the job execution - if there is a need to repartition/coalesce your input data or use aqe

