<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Distinguishing stream workload from batch work load in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/distinguishing-stream-workload-from-batch-work-load/m-p/108324#M43034</link>
    <description>&lt;P&gt;Is it possible the same data source of batch data as well as stream data. Please find the following code that I have got from internet. The following code handles both stream and batch workload. Please find attached the corresponding pdf file.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I am familiar with processTime trigger and available trigger. How these two triggers will distinguish the batch workload from stream work load. Available-now trigger support batch work load but how it will distingish from stream work load.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; time &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; invoiceStreamBatch():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; def&lt;/SPAN&gt; &lt;SPAN&gt;__init__&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;self&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; self&lt;/SPAN&gt;&lt;SPAN&gt;.base_data_dir = &lt;/SPAN&gt;&lt;SPAN&gt;"/FileStore/data_spark_streaming_scholarnest"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;def readInvoices(self):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;return (spark.readStream&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("json")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .schema(self.getSchema())&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .load(f"{self.base_data_dir}/data/invoices")&lt;BR /&gt;)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;P&gt;def appendInvoices(self, flattenedDF, trigger = "batch"):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;sQuery = (flattenedDF.writeStream&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("delta")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/invoices")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .outputMode("append")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("maxFilesPerTrigger", 1)&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;if (trigger == "batch"):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;return ( sQuery.trigger(availableNow = True)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.toTable("invoice_line_items"))&lt;BR /&gt;&amp;nbsp; &amp;nbsp; else:&lt;BR /&gt;&amp;nbsp; &amp;nbsp; return ( sQuery.trigger(processingTime = trigger)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.toTable("invoice_line_items"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp;def process(self, trigger = "batch"):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print(f"Starting Invoice Processing Stream...", end='')&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; invoicesDF = self.readInvoices()&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sQuery = self.appendInvoices(resultDF, trigger)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Done\n")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return sQuery&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Sun, 02 Feb 2025 01:54:51 GMT</pubDate>
    <dc:creator>subhas_hati</dc:creator>
    <dc:date>2025-02-02T01:54:51Z</dc:date>
    <item>
      <title>Distinguishing stream workload from batch work load</title>
      <link>https://community.databricks.com/t5/data-engineering/distinguishing-stream-workload-from-batch-work-load/m-p/108324#M43034</link>
      <description>&lt;P&gt;Is it possible the same data source of batch data as well as stream data. Please find the following code that I have got from internet. The following code handles both stream and batch workload. Please find attached the corresponding pdf file.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I am familiar with processTime trigger and available trigger. How these two triggers will distinguish the batch workload from stream work load. Available-now trigger support batch work load but how it will distingish from stream work load.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; time &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; invoiceStreamBatch():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; def&lt;/SPAN&gt; &lt;SPAN&gt;__init__&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;self&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; self&lt;/SPAN&gt;&lt;SPAN&gt;.base_data_dir = &lt;/SPAN&gt;&lt;SPAN&gt;"/FileStore/data_spark_streaming_scholarnest"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;def readInvoices(self):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;return (spark.readStream&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("json")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .schema(self.getSchema())&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .load(f"{self.base_data_dir}/data/invoices")&lt;BR /&gt;)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;P&gt;def appendInvoices(self, flattenedDF, trigger = "batch"):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;sQuery = (flattenedDF.writeStream&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("delta")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/invoices")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .outputMode("append")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("maxFilesPerTrigger", 1)&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;if (trigger == "batch"):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;return ( sQuery.trigger(availableNow = True)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.toTable("invoice_line_items"))&lt;BR /&gt;&amp;nbsp; &amp;nbsp; else:&lt;BR /&gt;&amp;nbsp; &amp;nbsp; return ( sQuery.trigger(processingTime = trigger)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.toTable("invoice_line_items"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp;def process(self, trigger = "batch"):&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print(f"Starting Invoice Processing Stream...", end='')&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; invoicesDF = self.readInvoices()&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sQuery = self.appendInvoices(resultDF, trigger)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Done\n")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return sQuery&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Sun, 02 Feb 2025 01:54:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/distinguishing-stream-workload-from-batch-work-load/m-p/108324#M43034</guid>
      <dc:creator>subhas_hati</dc:creator>
      <dc:date>2025-02-02T01:54:51Z</dc:date>
    </item>
    <item>
      <title>Re: Distinguishing stream workload from batch work load</title>
      <link>https://community.databricks.com/t5/data-engineering/distinguishing-stream-workload-from-batch-work-load/m-p/108326#M43036</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/145131"&gt;@subhas_hati&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Thanks for your question:&lt;/P&gt;
&lt;OL class="ol1"&gt;
&lt;LI class="li1"&gt;&lt;STRONG&gt;Batch Workload&lt;/STRONG&gt;: The availableNow trigger is used for batch processing. When you set the trigger to availableNow, it processes all available data as a single batch and then stops. This is useful for scenarios where you want to process all the data available at a specific point in time and then terminate the job.&lt;/LI&gt;
&lt;LI class="li1"&gt;&lt;STRONG&gt;Streaming Workload&lt;/STRONG&gt;: The processingTime trigger is used for streaming workloads. When you set the trigger to a specific time interval (e.g., processingTime='10 seconds'), it processes data in micro-batches at the specified interval. This allows for continuous processing of incoming data in near-real-time.&lt;/LI&gt;
&lt;/OL&gt;
&lt;P class="p1"&gt;In your code, the appendInvoices method distinguishes between batch and streaming workloads based on the trigger parameter:&lt;/P&gt;
&lt;UL class="ul1"&gt;
&lt;LI class="li1"&gt;If trigger is set to "batch", it uses the availableNow trigger.&lt;/LI&gt;
&lt;LI class="li1"&gt;Otherwise, it uses the processingTime trigger with the specified interval.&lt;/LI&gt;
&lt;/UL&gt;</description>
      <pubDate>Sun, 02 Feb 2025 02:11:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/distinguishing-stream-workload-from-batch-work-load/m-p/108326#M43036</guid>
      <dc:creator>Alberto_Umana</dc:creator>
      <dc:date>2025-02-02T02:11:42Z</dc:date>
    </item>
  </channel>
</rss>

