<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Implementation of a stable Spark Structured Streaming Application in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29346#M21080</link>
    <description>&lt;P&gt;Hi folks,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I have an issue. It's not critical but's annoying.&lt;/P&gt;&lt;P&gt;We have implemented a Spark Structured Streaming Application.&lt;/P&gt;&lt;P&gt;This application will be triggered wire Azure Data Factory (every 8 minutes). Ok, this setup sounds a little bit weird and it's not really Streaming, agreed. But the source system is not really real time and we would like to implement a Streaming POC, take a look into deep regarding the technics. That's all.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;So, this "Streaming Notebook" runs 24/7. Mostly in a stable way. But sometimes one single load runs into a TimeoutException like this:&lt;/P&gt;&lt;P&gt;java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 26c3f28c-9d17-486a-81be-df418c42cd74, runId = d30a8fe8-3fed-4475-8233-4577b775bb19] failed to stop within 15000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;This exception is clear. This load is trying to get access into the storage account, where checkpoint is located. Timeout occurs because another load has not finshed the work and locked the checkpoint, yet. In general this Streaming Application doesn't longer take than 1-2 minutes.&lt;/P&gt;&lt;P&gt;But some edge cases need more than 14-15 minutes and blocks other loads that will be started during this long run.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I did some investigation into driver logs and found a strange behavior into the log4j (see attached log4j_snippet.log, 99 sec. duration for what?).&lt;/P&gt;&lt;P&gt;In these edge cases I have a lot of entries like this.&lt;/P&gt;&lt;P&gt;ClusterLoadAvgHelper... what does it mean? Any ideas?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Like I sad, my own small business logic and connect to Azure SQL Database (as sink) does only take 1-2 minutes. In this edge cases where the whole processing time runs to 14-15 minutes more than 10 minutes are necessary for this ClusterLoadAvgHelper stuff.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Currently I have no idea why my cluster is running amok.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Like I sad at the beginning it's not critical, we don't miss any data into the SQL Database. But it's annoying :).&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Any ideas would be great.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks in advance,&lt;/P&gt;&lt;P&gt; Markus&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Data Source: Auto Loader mechanism like this (https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html)&lt;/P&gt;&lt;P&gt;Sink: Azure SQL Database&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Setup&lt;/P&gt;&lt;P&gt;Language: Python3&lt;/P&gt;&lt;P&gt;Databricks Runtime: 8.3 (includes Apache Spark 3.1.1, Scala 2.12) &lt;/P&gt;&lt;P&gt;Driver/Worker type: Standard_E4ds_v4&lt;/P&gt;&lt;P&gt;Cluster mode: Standard&lt;/P&gt;&lt;P&gt;Min Worker: 1 / Max Worker: 10&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;--- Streaming Notebook Snippet ON ---&lt;/P&gt;&lt;P&gt;# (1) Init Streaming Data Frame&lt;/P&gt;&lt;P&gt;streaming_df = spark.readStream.format("cloudFiles") \&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.option("cloudFiles.format", extension) \&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.schema(schema) \&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.load(streaming_path) # locate to Storage Account&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# (2) Start Streaming&lt;/P&gt;&lt;P&gt;query = (&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;streaming_df&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.writeStream&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.foreachBatch(process_batch_for_streaming) # sink into Azure SQL Database&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.trigger(once = True)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.option("checkpointLocation", checkpoint_path)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.start()&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;-- Streaming Notebook Snippet OFF ---&lt;/P&gt;</description>
    <pubDate>Sun, 06 Feb 2022 21:26:11 GMT</pubDate>
    <dc:creator>fsm</dc:creator>
    <dc:date>2022-02-06T21:26:11Z</dc:date>
    <item>
      <title>Implementation of a stable Spark Structured Streaming Application</title>
      <link>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29346#M21080</link>
      <description>&lt;P&gt;Hi folks,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I have an issue. It's not critical but's annoying.&lt;/P&gt;&lt;P&gt;We have implemented a Spark Structured Streaming Application.&lt;/P&gt;&lt;P&gt;This application will be triggered wire Azure Data Factory (every 8 minutes). Ok, this setup sounds a little bit weird and it's not really Streaming, agreed. But the source system is not really real time and we would like to implement a Streaming POC, take a look into deep regarding the technics. That's all.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;So, this "Streaming Notebook" runs 24/7. Mostly in a stable way. But sometimes one single load runs into a TimeoutException like this:&lt;/P&gt;&lt;P&gt;java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 26c3f28c-9d17-486a-81be-df418c42cd74, runId = d30a8fe8-3fed-4475-8233-4577b775bb19] failed to stop within 15000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;This exception is clear. This load is trying to get access into the storage account, where checkpoint is located. Timeout occurs because another load has not finshed the work and locked the checkpoint, yet. In general this Streaming Application doesn't longer take than 1-2 minutes.&lt;/P&gt;&lt;P&gt;But some edge cases need more than 14-15 minutes and blocks other loads that will be started during this long run.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I did some investigation into driver logs and found a strange behavior into the log4j (see attached log4j_snippet.log, 99 sec. duration for what?).&lt;/P&gt;&lt;P&gt;In these edge cases I have a lot of entries like this.&lt;/P&gt;&lt;P&gt;ClusterLoadAvgHelper... what does it mean? Any ideas?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Like I sad, my own small business logic and connect to Azure SQL Database (as sink) does only take 1-2 minutes. In this edge cases where the whole processing time runs to 14-15 minutes more than 10 minutes are necessary for this ClusterLoadAvgHelper stuff.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Currently I have no idea why my cluster is running amok.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Like I sad at the beginning it's not critical, we don't miss any data into the SQL Database. But it's annoying :).&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Any ideas would be great.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks in advance,&lt;/P&gt;&lt;P&gt; Markus&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Data Source: Auto Loader mechanism like this (https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html)&lt;/P&gt;&lt;P&gt;Sink: Azure SQL Database&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Setup&lt;/P&gt;&lt;P&gt;Language: Python3&lt;/P&gt;&lt;P&gt;Databricks Runtime: 8.3 (includes Apache Spark 3.1.1, Scala 2.12) &lt;/P&gt;&lt;P&gt;Driver/Worker type: Standard_E4ds_v4&lt;/P&gt;&lt;P&gt;Cluster mode: Standard&lt;/P&gt;&lt;P&gt;Min Worker: 1 / Max Worker: 10&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;--- Streaming Notebook Snippet ON ---&lt;/P&gt;&lt;P&gt;# (1) Init Streaming Data Frame&lt;/P&gt;&lt;P&gt;streaming_df = spark.readStream.format("cloudFiles") \&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.option("cloudFiles.format", extension) \&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.schema(schema) \&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.load(streaming_path) # locate to Storage Account&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# (2) Start Streaming&lt;/P&gt;&lt;P&gt;query = (&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;streaming_df&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.writeStream&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.foreachBatch(process_batch_for_streaming) # sink into Azure SQL Database&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.trigger(once = True)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.option("checkpointLocation", checkpoint_path)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;.start()&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;-- Streaming Notebook Snippet OFF ---&lt;/P&gt;</description>
      <pubDate>Sun, 06 Feb 2022 21:26:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29346#M21080</guid>
      <dc:creator>fsm</dc:creator>
      <dc:date>2022-02-06T21:26:11Z</dc:date>
    </item>
    <item>
      <title>Re: Implementation of a stable Spark Structured Streaming Application</title>
      <link>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29348#M21082</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have investigated a little bit more. Currently I think the ClusterLoadAvgHelper behaviour is not the main problem. This behaviour hides this one:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;22/02/08 22:52:09 WARN TaskSetManager: Lost task 0.0 in stage 197663.0 (TID 4615729) (10.141.64.7 executor 410): com.microsoft.sqlserver.jdbc.SQLServerException: Connection timed out (Read failed)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.SimpleInputStream.getBytes(SimpleInputStream.java:352)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:796)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:3777)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:247)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:190)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2054)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2040)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2511)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12(JdbcUtils.scala:445)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12$adapted(JdbcUtils.scala:443)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:353)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:335)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I will do further checks into Azure SQL Database (stats, re-org run..) combined with sql-spark-connector. Maybe there is a conflict between streaming and stats/reorg.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Furthermore I will extend log4j config to get more details about&lt;/P&gt;&lt;P&gt;com.microsoft.sqlserver.jdbc.*&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks,&lt;/P&gt;&lt;P&gt; Markus&lt;/P&gt;</description>
      <pubDate>Thu, 10 Feb 2022 08:39:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29348#M21082</guid>
      <dc:creator>fsm</dc:creator>
      <dc:date>2022-02-10T08:39:22Z</dc:date>
    </item>
    <item>
      <title>Re: Implementation of a stable Spark Structured Streaming Application</title>
      <link>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29349#M21083</link>
      <description>&lt;P&gt;@fsm - Thank you for the extra information! &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt; &lt;/P&gt;</description>
      <pubDate>Thu, 10 Feb 2022 14:57:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29349#M21083</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2022-02-10T14:57:21Z</dc:date>
    </item>
    <item>
      <title>Re: Implementation of a stable Spark Structured Streaming Application</title>
      <link>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29350#M21084</link>
      <description>&lt;P&gt;@Markus Freischlad​&amp;nbsp; Looks like the spark driver was stuck. It will be good to capture the thread dump of the Spark driver to understand what operation is stuck&lt;/P&gt;</description>
      <pubDate>Tue, 15 Mar 2022 01:24:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29350#M21084</guid>
      <dc:creator>brickster_2018</dc:creator>
      <dc:date>2022-03-15T01:24:49Z</dc:date>
    </item>
    <item>
      <title>Re: Implementation of a stable Spark Structured Streaming Application</title>
      <link>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29351#M21085</link>
      <description>&lt;P&gt;Hi @Markus Freischlad​&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Have you try to run your streaming job with a different trigger interval? Just to isolate the issue. If you use DBR 10.1+ there is a new trigger that was introduced in this DBR version. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The new trigger is  "trigger.AvailableNow". It is like Trigger.Once, which processes all available data then stops the query. However, Trigger.AvailableNow provides better scalability, because data can be processed in multiple batches instead of one.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;docs here &lt;A href="https://docs.databricks.com/release-notes/runtime/10.1.html#triggeravailablenow-for-delta-source-streaming-queries" target="test_blank"&gt;https://docs.databricks.com/release-notes/runtime/10.1.html#triggeravailablenow-for-delta-source-streaming-queries&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 22 Mar 2022 16:33:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implementation-of-a-stable-spark-structured-streaming/m-p/29351#M21085</guid>
      <dc:creator>jose_gonzalez</dc:creator>
      <dc:date>2022-03-22T16:33:36Z</dc:date>
    </item>
  </channel>
</rss>

