<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Spark StreamingQuery not processing all data from source directory in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/spark-streamingquery-not-processing-all-data-from-source/m-p/16166#M10377</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have setup a streaming process that consumers files from HDFS staging directory and writes into target location. Input directory continuesouly gets files from another process.&lt;/P&gt;&lt;P&gt;Lets say file producer produces 5 million records sends it to hdfs staging directory in 50 different files in 5 mintues winodw. The streaming process picks up the files ( processing trigger set to 60 secs) and processing them all but the problem is it is missing 2-3% records after processing all the files from input directory.&lt;/P&gt;&lt;P&gt;I checked the checkpoint source directory and it shows that all the files are picked up and processed. &lt;/P&gt;&lt;P&gt;Not sure what else to check. &lt;/P&gt;&lt;P&gt;@Prakash Chockalingam​&amp;nbsp;Can you pls help ?&lt;/P&gt;&lt;P&gt;Dataset&amp;lt;Row&amp;gt; dataDF = spark.readStream() .option("sep", delimiter) .option("header", "false") .option("inferSchema", "false") .option("latestFirst","false") .schema(appConfig.getSchema()) .csv(appConfig.getHdfsSourceDir());&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt; StreamingQuery query = dataDF.writeStream() .trigger(ProcessingTime.create(60, TimeUnit.SECONDS)) .format("parquet") .outputMode(OutputMode.Append()) .partitionBy(appConfig.getPartitionedBy().split(",")) .option("checkpointLocation", appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()+ "/checkpoint") .option("compression", appConfig.getSparkCompressType()) .start(appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()); query.awaitTermination(); &lt;/P&gt;</description>
    <pubDate>Fri, 20 Aug 2021 12:55:04 GMT</pubDate>
    <dc:creator>RajaLakshmanan</dc:creator>
    <dc:date>2021-08-20T12:55:04Z</dc:date>
    <item>
      <title>Spark StreamingQuery not processing all data from source directory</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streamingquery-not-processing-all-data-from-source/m-p/16166#M10377</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have setup a streaming process that consumers files from HDFS staging directory and writes into target location. Input directory continuesouly gets files from another process.&lt;/P&gt;&lt;P&gt;Lets say file producer produces 5 million records sends it to hdfs staging directory in 50 different files in 5 mintues winodw. The streaming process picks up the files ( processing trigger set to 60 secs) and processing them all but the problem is it is missing 2-3% records after processing all the files from input directory.&lt;/P&gt;&lt;P&gt;I checked the checkpoint source directory and it shows that all the files are picked up and processed. &lt;/P&gt;&lt;P&gt;Not sure what else to check. &lt;/P&gt;&lt;P&gt;@Prakash Chockalingam​&amp;nbsp;Can you pls help ?&lt;/P&gt;&lt;P&gt;Dataset&amp;lt;Row&amp;gt; dataDF = spark.readStream() .option("sep", delimiter) .option("header", "false") .option("inferSchema", "false") .option("latestFirst","false") .schema(appConfig.getSchema()) .csv(appConfig.getHdfsSourceDir());&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt; StreamingQuery query = dataDF.writeStream() .trigger(ProcessingTime.create(60, TimeUnit.SECONDS)) .format("parquet") .outputMode(OutputMode.Append()) .partitionBy(appConfig.getPartitionedBy().split(",")) .option("checkpointLocation", appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()+ "/checkpoint") .option("compression", appConfig.getSparkCompressType()) .start(appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()); query.awaitTermination(); &lt;/P&gt;</description>
      <pubDate>Fri, 20 Aug 2021 12:55:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streamingquery-not-processing-all-data-from-source/m-p/16166#M10377</guid>
      <dc:creator>RajaLakshmanan</dc:creator>
      <dc:date>2021-08-20T12:55:04Z</dc:date>
    </item>
    <item>
      <title>Re: Spark StreamingQuery not processing all data from source directory</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streamingquery-not-processing-all-data-from-source/m-p/16168#M10379</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Did you check the logs for the micro-batch metrics? these metrics will help you to identify the number of records you process and the time it took to process the data. This article shows the metrics that are capture and how you can use them to know how is your streaming job working. Link &lt;A href="https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structured-streaming/read#measured_metrics" target="test_blank"&gt;https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structured-streaming/read#measured_metrics&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thank you&lt;/P&gt;</description>
      <pubDate>Tue, 28 Sep 2021 18:05:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streamingquery-not-processing-all-data-from-source/m-p/16168#M10379</guid>
      <dc:creator>jose_gonzalez</dc:creator>
      <dc:date>2021-09-28T18:05:48Z</dc:date>
    </item>
    <item>
      <title>Re: Spark StreamingQuery not processing all data from source directory</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streamingquery-not-processing-all-data-from-source/m-p/16169#M10380</link>
      <description>&lt;P&gt;If it helps ,  you run try running the Left-Anti join on source and sink to identify missing records and see whether the record is in match with the schema provided or not&lt;/P&gt;</description>
      <pubDate>Wed, 29 Sep 2021 11:12:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streamingquery-not-processing-all-data-from-source/m-p/16169#M10380</guid>
      <dc:creator>User16763506586</dc:creator>
      <dc:date>2021-09-29T11:12:37Z</dc:date>
    </item>
  </channel>
</rss>

