Is it possible the same data source of batch data as well as stream data. Please find the following code that I have got from internet. The following code handles both stream and batch workload. Please find attached the corresponding pdf file.
I am familiar with processTime trigger and available trigger. How these two triggers will distinguish the batch workload from stream work load. Available-now trigger support batch work load but how it will distingish from stream work load.
import time
class invoiceStreamBatch():
def __init__(self😞
self.base_data_dir = "/FileStore/data_spark_streaming_scholarnest"
def readInvoices(self):
return (spark.readStream
.format("json")
.schema(self.getSchema())
.load(f"{self.base_data_dir}/data/invoices")
)
def appendInvoices(self, flattenedDF, trigger = "batch"):
sQuery = (flattenedDF.writeStream
.format("delta")
.option("checkpointLocation", f"{self.base_data_dir}/chekpoint/invoices")
.outputMode("append")
.option("maxFilesPerTrigger", 1)
)
if (trigger == "batch"):
return ( sQuery.trigger(availableNow = True)
.toTable("invoice_line_items"))
else:
return ( sQuery.trigger(processingTime = trigger)
.toTable("invoice_line_items"))
def process(self, trigger = "batch"):
print(f"Starting Invoice Processing Stream...", end='')
invoicesDF = self.readInvoices()
sQuery = self.appendInvoices(resultDF, trigger)
print("Done\n")
return sQuery