@Krishnamoorthy Natarajanโ : Please try to use the foreachBatch() method to apply custom processing on the output data of each micro-batch. Sample code is as below
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define your schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# Define your streaming data source
input_path = "/mnt/input-folder/*/*/*.csv"
df = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv(input_path).withColumn("input_file", input_file_name())
# Define the foreachBatch function to write to Delta
def write_to_delta(df, epoch_id):
# Get the input file path
input_file = df.select("input_file").first()[0]
# Define the output path based on the input file
output_path = "/mnt/output-folder/" + input_file.split("/")[-3] + "/" + input_file.split("/")[-2]
# Write the data to Delta
df.write.format("delta").mode("append").option("path", output_path).save()
# Apply the foreachBatch function on the output data
df.writeStream.foreachBatch(write_to_delta).start().awaitTermination()