cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Distinguishing stream workload from batch work load

subhas_hati
New Contributor

Is it possible the same data source of batch data as well as stream data. Please find the following code that I have got from internet. The following code handles both stream and batch workload. Please find attached the corresponding pdf file. 

I am familiar with processTime trigger and available trigger. How these two triggers will distinguish the batch workload from stream work load. Available-now trigger support batch work load but how it will distingish from stream work load. 

 

import time
class invoiceStreamBatch():
      def __init__(self😞
        self.base_data_dir = "/FileStore/data_spark_streaming_scholarnest"
 
def readInvoices(self):
       return (spark.readStream
                        .format("json")
                        .schema(self.getSchema())
                        .load(f"{self.base_data_dir}/data/invoices")
)
 

def appendInvoices(self, flattenedDF, trigger = "batch"):
         sQuery = (flattenedDF.writeStream
                                            .format("delta")
                                            .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/invoices")
                                            .outputMode("append")
                                            .option("maxFilesPerTrigger", 1)
)

       if (trigger == "batch"):
           return ( sQuery.trigger(availableNow = True)
               .toTable("invoice_line_items"))
    else:
    return ( sQuery.trigger(processingTime = trigger)
                   .toTable("invoice_line_items"))

 

     def process(self, trigger = "batch"):
          print(f"Starting Invoice Processing Stream...", end='')
          invoicesDF = self.readInvoices()
          sQuery = self.appendInvoices(resultDF, trigger)
          print("Done\n")
          return sQuery
 
1 REPLY 1

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @subhas_hati,

Thanks for your question:

  1. Batch Workload: The availableNow trigger is used for batch processing. When you set the trigger to availableNow, it processes all available data as a single batch and then stops. This is useful for scenarios where you want to process all the data available at a specific point in time and then terminate the job.
  2. Streaming Workload: The processingTime trigger is used for streaming workloads. When you set the trigger to a specific time interval (e.g., processingTime='10 seconds'), it processes data in micro-batches at the specified interval. This allows for continuous processing of incoming data in near-real-time.

In your code, the appendInvoices method distinguishes between batch and streaming workloads based on the trigger parameter:

  • If trigger is set to "batch", it uses the availableNow trigger.
  • Otherwise, it uses the processingTime trigger with the specified interval.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group