<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Delta write stream to different folders dynamically based on input file in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/delta-write-stream-to-different-folders-dynamically-based-on/m-p/9552#M4891</link>
    <description>&lt;P&gt;I have root folder and files are getting ingested in sub folders​ . Want to build a workflow which will write stream based on file being ingested &lt;/P&gt;</description>
    <pubDate>Sat, 11 Feb 2023 09:28:46 GMT</pubDate>
    <dc:creator>Krishna264</dc:creator>
    <dc:date>2023-02-11T09:28:46Z</dc:date>
    <item>
      <title>Delta write stream to different folders dynamically based on input file</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-write-stream-to-different-folders-dynamically-based-on/m-p/9552#M4891</link>
      <description>&lt;P&gt;I have root folder and files are getting ingested in sub folders​ . Want to build a workflow which will write stream based on file being ingested &lt;/P&gt;</description>
      <pubDate>Sat, 11 Feb 2023 09:28:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-write-stream-to-different-folders-dynamically-based-on/m-p/9552#M4891</guid>
      <dc:creator>Krishna264</dc:creator>
      <dc:date>2023-02-11T09:28:46Z</dc:date>
    </item>
    <item>
      <title>Re: Delta write stream to different folders dynamically based on input file</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-write-stream-to-different-folders-dynamically-based-on/m-p/9553#M4892</link>
      <description>&lt;P&gt;@Krishnamoorthy Natarajan​&amp;nbsp;: Please try to use the foreachBatch() method to apply custom processing on the output data of each micro-batch. Sample code is as below&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
&amp;nbsp;
# Define your schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
&amp;nbsp;
# Define your streaming data source
input_path = "/mnt/input-folder/*/*/*.csv"
df = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv(input_path).withColumn("input_file", input_file_name())
&amp;nbsp;
# Define the foreachBatch function to write to Delta
def write_to_delta(df, epoch_id):
    # Get the input file path
    input_file = df.select("input_file").first()[0]
&amp;nbsp;
    # Define the output path based on the input file
    output_path = "/mnt/output-folder/" + input_file.split("/")[-3] + "/" + input_file.split("/")[-2]
&amp;nbsp;
    # Write the data to Delta
    df.write.format("delta").mode("append").option("path", output_path).save()
&amp;nbsp;
# Apply the foreachBatch function on the output data
df.writeStream.foreachBatch(write_to_delta).start().awaitTermination()&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 07 Mar 2023 08:28:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-write-stream-to-different-folders-dynamically-based-on/m-p/9553#M4892</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2023-03-07T08:28:18Z</dc:date>
    </item>
    <item>
      <title>Re: Delta write stream to different folders dynamically based on input file</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-write-stream-to-different-folders-dynamically-based-on/m-p/9554#M4893</link>
      <description>&lt;P&gt;Hi @Krishnamoorthy Natarajan​&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thank you for your question! To assist you better, please take a moment to review the answer and let me know if it best fits your needs.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Please help us select the best solution by clicking on "Select As Best" if it does.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Your feedback will help us ensure that we are providing the best possible service to you. Thank you!&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 09 Apr 2023 07:05:30 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-write-stream-to-different-folders-dynamically-based-on/m-p/9554#M4893</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2023-04-09T07:05:30Z</dc:date>
    </item>
  </channel>
</rss>

