Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Delta write stream to different folders dynamically based on input file

I have root folder and files are getting ingested in sub folders​ . Want to build a workflow which will write stream based on file being ingested


@Krishnamoorthy Natarajan​ : Please try to use the foreachBatch() method to apply custom processing on the output data of each micro-batch. Sample code is as below

from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define your schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
# Define your streaming data source
input_path = "/mnt/input-folder/*/*/*.csv"
df = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv(input_path).withColumn("input_file", input_file_name())
# Define the foreachBatch function to write to Delta
def write_to_delta(df, epoch_id):
    # Get the input file path
    input_file ="input_file").first()[0]
    # Define the output path based on the input file
    output_path = "/mnt/output-folder/" + input_file.split("/")[-3] + "/" + input_file.split("/")[-2]
    # Write the data to Delta
    df.write.format("delta").mode("append").option("path", output_path).save()
# Apply the foreachBatch function on the output data

