Is it possible the same data source of batch data as well as stream data. Please find the following code that I have got from internet. The following code handles both stream and batch workload. Please find attached the corresponding pdf file. 
I am familiar with processTime trigger and available trigger. How these two triggers will distinguish the batch workload from stream work load. Available-now trigger support batch work load but how it will distingish from stream work load. 
 
import time 
class invoiceStreamBatch():
      def __init__(self😞
        self.base_data_dir = "/FileStore/data_spark_streaming_scholarnest"
 
def readInvoices(self):
       return (spark.readStream
                        .format("json")
                        .schema(self.getSchema())
                        .load(f"{self.base_data_dir}/data/invoices")
)
 
def appendInvoices(self, flattenedDF, trigger = "batch"):
         sQuery = (flattenedDF.writeStream
                                            .format("delta")
                                            .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/invoices")
                                            .outputMode("append")
                                            .option("maxFilesPerTrigger", 1)
)
       if (trigger == "batch"):
           return ( sQuery.trigger(availableNow = True)
               .toTable("invoice_line_items"))
    else:
    return ( sQuery.trigger(processingTime = trigger)
                   .toTable("invoice_line_items"))
 
     def process(self, trigger = "batch"):
          print(f"Starting Invoice Processing Stream...", end='')
          invoicesDF = self.readInvoices()
          sQuery = self.appendInvoices(resultDF, trigger)
          print("Done\n")
          return sQuery