I am reading changes from the cdf with availableOnce=True, processing data from checkpoint to checkpoint. During each batch, I perform transformations, but I also need to read two large tables and one small table. Does Spark read these tables from scratch for each batch, or caching internally?
# Read
df = self.spark.readStream.format("delta").option("readChangeFeed", "true")
# Transform
def transform(batch_df):
# Read delta table 230 GB
big_side_df = spark.read.format("delta")...
# Read small df
# Do aggregations
batchdf.join(broadcast(small))
batchdf.join(bigside_df)....
return final_df
def process_each_batch(df, _):
transformed df = transform(df)
# Do Upsert
df.upsert()
self.df.writeStream.format(DeltaUtils.FORMAT_DELTA)
.option("checkpointLocation", self.checkpoint_loc)
.foreachBatch(self.process_each_batch)
.outputMode("update")
.trigger(availableNow=True)
.start()
Also I am pretty new and I want to optimize the jobs but I am not sure where to focus on.
First things to come my minds are
- Adding maxBytesPerTrigger
- Reading from outside of the each batch and cache?
- Doing aggregations and prepare big df's to be consumed outside of the job(with another job in the workflow)
Any suggestion would be appreciated.