<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Reading two big tables within each forEachBatch processing method in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/reading-two-big-tables-within-each-foreachbatch-processing/m-p/100010#M40167</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/133636"&gt;@mjedy7&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;for cacheing in this scenario You could try to levarage persist() and unpersist() for the big table/ spark dataframe, see here:&lt;/P&gt;&lt;P&gt;&lt;A href="https://medium.com/@eloutmadiabderrahim/persist-vs-unpersist-in-spark-485694f72452" target="_blank"&gt;https://medium.com/@eloutmadiabderrahim/persist-vs-unpersist-in-spark-485694f72452&lt;/A&gt;&lt;/P&gt;&lt;P&gt;Try to reduce the amount of data in big spark df You will cache, by&amp;nbsp;reading only the neccessary columns, filtering data (if possible), precompute etc. Run vacuum and optimize on Your table regurarly, consider zordering the data to help spark skipping/ pruning the data aswell.&lt;/P&gt;&lt;P&gt;Broadcasting small table might be good idea.&lt;/P&gt;&lt;P&gt;Setting&amp;nbsp;maxBytesPerTrigger/ maxFilesPerTrigger is for sure good idea.&lt;/P&gt;&lt;P&gt;Make sure your upsert is performing well.&lt;/P&gt;&lt;P&gt;Running the job please use the &lt;STRONG&gt;Spark UI&lt;/STRONG&gt; to validate performance:&lt;/P&gt;&lt;P&gt;- monitor usage of the %CPU for each node, make sure Your job utilize all cpu evenly,&lt;/P&gt;&lt;P&gt;- check whats the number of tasks processing during the job execution - if there is a need to repartition/coalesce your input data or use aqe&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Mon, 25 Nov 2024 21:05:32 GMT</pubDate>
    <dc:creator>radothede</dc:creator>
    <dc:date>2024-11-25T21:05:32Z</dc:date>
    <item>
      <title>Reading two big tables within each forEachBatch processing method</title>
      <link>https://community.databricks.com/t5/data-engineering/reading-two-big-tables-within-each-foreachbatch-processing/m-p/99930#M40147</link>
      <description>&lt;P class=""&gt;I am reading changes from the cdf with &lt;SPAN class=""&gt;availableOnce=True&lt;/SPAN&gt;, processing data from checkpoint to checkpoint. During each batch, I perform transformations, but I also need to read two large tables and one small table. Does Spark read these tables from scratch for each batch, or caching internally?&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Read
df = self.spark.readStream.format("delta").option("readChangeFeed", "true")

# Transform
def transform(batch_df):
   # Read delta table 230 GB
   big_side_df = spark.read.format("delta")...
   # Read small df 

   # Do aggregations
   batchdf.join(broadcast(small))
   batchdf.join(bigside_df)....
   return final_df

def process_each_batch(df, _):
   transformed df = transform(df)
   
   # Do Upsert
   df.upsert()


self.df.writeStream.format(DeltaUtils.FORMAT_DELTA)
            .option("checkpointLocation", self.checkpoint_loc)
            .foreachBatch(self.process_each_batch)
            .outputMode("update")
            .trigger(availableNow=True)
            .start()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Also I am pretty new and I want to optimize the jobs but I am not sure where to focus on.&lt;/P&gt;&lt;P&gt;First things to come my minds are&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Adding maxBytesPerTrigger&lt;/LI&gt;&lt;LI&gt;Reading from outside of the each batch and cache?&lt;/LI&gt;&lt;LI&gt;Doing aggregations and prepare big df's to be consumed outside of the job(with another job in the workflow)&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Any suggestion would be appreciated.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 25 Nov 2024 07:45:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/reading-two-big-tables-within-each-foreachbatch-processing/m-p/99930#M40147</guid>
      <dc:creator>mjedy7</dc:creator>
      <dc:date>2024-11-25T07:45:02Z</dc:date>
    </item>
    <item>
      <title>Re: Reading two big tables within each forEachBatch processing method</title>
      <link>https://community.databricks.com/t5/data-engineering/reading-two-big-tables-within-each-foreachbatch-processing/m-p/100010#M40167</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/133636"&gt;@mjedy7&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;for cacheing in this scenario You could try to levarage persist() and unpersist() for the big table/ spark dataframe, see here:&lt;/P&gt;&lt;P&gt;&lt;A href="https://medium.com/@eloutmadiabderrahim/persist-vs-unpersist-in-spark-485694f72452" target="_blank"&gt;https://medium.com/@eloutmadiabderrahim/persist-vs-unpersist-in-spark-485694f72452&lt;/A&gt;&lt;/P&gt;&lt;P&gt;Try to reduce the amount of data in big spark df You will cache, by&amp;nbsp;reading only the neccessary columns, filtering data (if possible), precompute etc. Run vacuum and optimize on Your table regurarly, consider zordering the data to help spark skipping/ pruning the data aswell.&lt;/P&gt;&lt;P&gt;Broadcasting small table might be good idea.&lt;/P&gt;&lt;P&gt;Setting&amp;nbsp;maxBytesPerTrigger/ maxFilesPerTrigger is for sure good idea.&lt;/P&gt;&lt;P&gt;Make sure your upsert is performing well.&lt;/P&gt;&lt;P&gt;Running the job please use the &lt;STRONG&gt;Spark UI&lt;/STRONG&gt; to validate performance:&lt;/P&gt;&lt;P&gt;- monitor usage of the %CPU for each node, make sure Your job utilize all cpu evenly,&lt;/P&gt;&lt;P&gt;- check whats the number of tasks processing during the job execution - if there is a need to repartition/coalesce your input data or use aqe&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 25 Nov 2024 21:05:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/reading-two-big-tables-within-each-foreachbatch-processing/m-p/100010#M40167</guid>
      <dc:creator>radothede</dc:creator>
      <dc:date>2024-11-25T21:05:32Z</dc:date>
    </item>
  </channel>
</rss>

