<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Large Data ingestion issue using auto loader in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39313#M26914</link>
    <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/86042"&gt;@bzh&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I highly suspect that a single file has 3 million records. To confirm if the data is from a single file or multiple files, could you add a new column with the value as input_file_name(). This would help us to understand whether the config maxFilesPerTrigger is being considered or not.&lt;/P&gt;&lt;P&gt;Document -&amp;nbsp;&lt;A href="https://docs.databricks.com/en/sql/language-manual/functions/input_file_name.html" target="_blank"&gt;https://docs.databricks.com/en/sql/language-manual/functions/input_file_name.html&lt;/A&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 08 Aug 2023 04:22:07 GMT</pubDate>
    <dc:creator>Tharun-Kumar</dc:creator>
    <dc:date>2023-08-08T04:22:07Z</dc:date>
    <item>
      <title>Large Data ingestion issue using auto loader</title>
      <link>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39295#M26913</link>
      <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;The goal of this project is to ingest 1000+ files (100MB per file) from S3 into Databricks. Since this will be incremental changes, we are using Autoloader&amp;nbsp;for continued ingestion and transformation&amp;nbsp;&lt;SPAN&gt;&lt;SPAN&gt;using a cluster (i3.xlarge).&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV&gt;&amp;nbsp;&lt;DIV&gt;&lt;STRONG&gt;&lt;STRONG&gt;The current process is very slow. Feels like it might take days to complete.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;DIV&gt;1. E&lt;SPAN&gt;&lt;SPAN&gt;ach file is about 100,000+ rows. But when I run the code, Spark UI shows there are over 3,000,000 rows (see attachment) being processed altogether even though I setup maxFilesPerTrigger=1&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV&gt;2. We are using UDFs. We understand UDFs can be a costly operation compared to the Pyspark dataframe. But some of the python logic is on row level transformation which is quite hard to convert into a dataframe.&lt;DIV&gt;&amp;nbsp;&lt;DIV&gt;&amp;nbsp;&lt;DIV&gt;&amp;nbsp;&lt;DIV&gt;&lt;DIV&gt;Here is the flow of our code:&lt;DIV&gt;&lt;P&gt;# read stream from s3 mount&lt;BR /&gt;df = (&lt;BR /&gt;spark.readStream.format("cloudFiles")&lt;BR /&gt;.option("cloudFiles.format", "csv")&lt;BR /&gt;.option("sep", "||||")&lt;BR /&gt;.schema(transaction_schema)&lt;BR /&gt;.option("maxFilesPerTrigger", 1)&lt;BR /&gt;.load("dbfs:/mnt/s3/public/test_transactions")&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;# load contracts_df once outside the streaming operation&lt;BR /&gt;contracts_df = spark.read.table("hive_metastore.spindl.contracts")&lt;/P&gt;&lt;P&gt;# apply transformations to the entire streaming DataFrame&lt;BR /&gt;df = df.withColumn("transaction_hash", F.col("id"))&lt;BR /&gt;# ... more transformations ...&lt;/P&gt;&lt;P&gt;# define the UDFs&lt;BR /&gt;contract_info_udf = F.udf(contract_info, ...&lt;/P&gt;&lt;P&gt;# apply the UDFs&lt;BR /&gt;df = df.withColumn("contract_info", contract_info_udf(F.struct(contracts_df.columns)))&lt;BR /&gt;# ... more transformations ...&lt;/P&gt;&lt;P&gt;# write into transactions table&lt;BR /&gt;df.write.mode("append").insertInto("hive_metastore.spindl.test_transactions")&lt;/P&gt;&lt;P&gt;# write the stream&lt;BR /&gt;query = df.writeStream\&lt;BR /&gt;.format("delta")\&lt;BR /&gt;.option("checkpointLocation", "/tmp/delta/test/_write_stream_checkpoints/")\&lt;BR /&gt;.start()&lt;/P&gt;&lt;P&gt;query.awaitTermination()&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&amp;nbsp;&lt;DIV&gt;Here is the flow of our code.&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Mon, 07 Aug 2023 20:29:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39295#M26913</guid>
      <dc:creator>bzh</dc:creator>
      <dc:date>2023-08-07T20:29:59Z</dc:date>
    </item>
    <item>
      <title>Re: Large Data ingestion issue using auto loader</title>
      <link>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39313#M26914</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/86042"&gt;@bzh&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I highly suspect that a single file has 3 million records. To confirm if the data is from a single file or multiple files, could you add a new column with the value as input_file_name(). This would help us to understand whether the config maxFilesPerTrigger is being considered or not.&lt;/P&gt;&lt;P&gt;Document -&amp;nbsp;&lt;A href="https://docs.databricks.com/en/sql/language-manual/functions/input_file_name.html" target="_blank"&gt;https://docs.databricks.com/en/sql/language-manual/functions/input_file_name.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 08 Aug 2023 04:22:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39313#M26914</guid>
      <dc:creator>Tharun-Kumar</dc:creator>
      <dc:date>2023-08-08T04:22:07Z</dc:date>
    </item>
    <item>
      <title>Re: Large Data ingestion issue using auto loader</title>
      <link>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39369#M26940</link>
      <description>&lt;P&gt;It looks like the 3m records are coming from one file. And to process these records, you might need more cores in your cluster.&lt;/P&gt;</description>
      <pubDate>Tue, 08 Aug 2023 14:45:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39369#M26940</guid>
      <dc:creator>Lakshay</dc:creator>
      <dc:date>2023-08-08T14:45:17Z</dc:date>
    </item>
    <item>
      <title>Re: Large Data ingestion issue using auto loader</title>
      <link>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39373#M26942</link>
      <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;There are several possible ways to improve the performance of your Spark streaming job for ingesting a large volume of S3 files. Here are a few suggestions:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Tune the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;spark.sql.shuffle.partitions&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;config parameter:&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;By default, the number of shuffle partitions is set to 200, which may be too low for your workload. This parameter controls how many partitions Spark should use when shuffling data between stages, and too few partitions can lead to low parallelism and slow performance. You can try increasing this parameter to a higher number, based on the size of your data and the number of cores on your cluster nodes, to increase parallelism and improve performance.&lt;/P&gt;&lt;P&gt;For example:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;python&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;spark.conf.&lt;/SPAN&gt;&lt;SPAN class=""&gt;set&lt;/SPAN&gt;&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"spark.sql.shuffle.partitions"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class=""&gt;"1000"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;PRE&gt;&amp;nbsp;&lt;/PRE&gt;&lt;OL&gt;&lt;LI&gt;Coalesce the output dataframe before writing to Delta Lake:&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;When writing to Delta Lake, Spark first writes the output to a set of temporary files, and then merges them into the main table using a background job. If you have too many small files, this can lead to poor performance. To help mitigate this issue, you could coalesce the output dataframe to reduce the number of files created.&lt;/P&gt;&lt;P&gt;For example:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;python&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;df.coalesce(&lt;/SPAN&gt;&lt;SPAN class=""&gt;16&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;.write.mode(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"append"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;format&lt;/SPAN&gt;&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"delta"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;...&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;OL&gt;&lt;LI&gt;Simplify/ optimize your UDFs&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Performing row level transformation using UDFs can be expensive, especially if the function is not optimized. You can try optimizing your UDFs by:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Broadcasting small DataFrames, making them available on all nodes.&lt;/LI&gt;&lt;LI&gt;Use vectorized UDFs&lt;/LI&gt;&lt;LI&gt;Avoid Python UDFs if possible.&lt;/LI&gt;&lt;/UL&gt;&lt;OL&gt;&lt;LI&gt;Increase instance types, adjust autoscaling:&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;You may want to consider using larger instance types. Alternatively, you could adjust your autoscaling policy to dynamically scale the cluster size based on incoming workload. You can use Databricks Autoscaling feature to help maximize utilization and reduce overall cluster costs.&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Use Delta's auto-optimize feature&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Consider turning on Delta Lake's auto-optimize features explicitly. As you keep ingesting data, the options like optimize/compact can improve query performance.&lt;/P&gt;&lt;P&gt;These are some of the best practices to improve the Spark streaming application performance for ingesting large volumes of data from S3 using AutoLoader.&lt;/P&gt;&lt;P&gt;I hope that helps!&lt;/P&gt;</description>
      <pubDate>Tue, 08 Aug 2023 15:44:40 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/large-data-ingestion-issue-using-auto-loader/m-p/39373#M26942</guid>
      <dc:creator>youssefmrini</dc:creator>
      <dc:date>2023-08-08T15:44:40Z</dc:date>
    </item>
  </channel>
</rss>

