โ02-06-2022 01:26 PM
Hi folks,
I have an issue. It's not critical but's annoying.
We have implemented a Spark Structured Streaming Application.
This application will be triggered wire Azure Data Factory (every 8 minutes). Ok, this setup sounds a little bit weird and it's not really Streaming, agreed. But the source system is not really real time and we would like to implement a Streaming POC, take a look into deep regarding the technics. That's all.
So, this "Streaming Notebook" runs 24/7. Mostly in a stable way. But sometimes one single load runs into a TimeoutException like this:
java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 26c3f28c-9d17-486a-81be-df418c42cd74, runId = d30a8fe8-3fed-4475-8233-4577b775bb19] failed to stop within 15000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.
This exception is clear. This load is trying to get access into the storage account, where checkpoint is located. Timeout occurs because another load has not finshed the work and locked the checkpoint, yet. In general this Streaming Application doesn't longer take than 1-2 minutes.
But some edge cases need more than 14-15 minutes and blocks other loads that will be started during this long run.
I did some investigation into driver logs and found a strange behavior into the log4j (see attached log4j_snippet.log, 99 sec. duration for what?).
In these edge cases I have a lot of entries like this.
ClusterLoadAvgHelper... what does it mean? Any ideas?
Like I sad, my own small business logic and connect to Azure SQL Database (as sink) does only take 1-2 minutes. In this edge cases where the whole processing time runs to 14-15 minutes more than 10 minutes are necessary for this ClusterLoadAvgHelper stuff.
Currently I have no idea why my cluster is running amok.
Like I sad at the beginning it's not critical, we don't miss any data into the SQL Database. But it's annoying :).
Any ideas would be great.
Thanks in advance,
Markus
Data Source: Auto Loader mechanism like this (https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html)
Sink: Azure SQL Database
Setup
Language: Python3
Databricks Runtime: 8.3 (includes Apache Spark 3.1.1, Scala 2.12)
Driver/Worker type: Standard_E4ds_v4
Cluster mode: Standard
Min Worker: 1 / Max Worker: 10
--- Streaming Notebook Snippet ON ---
# (1) Init Streaming Data Frame
streaming_df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", extension) \
.schema(schema) \
.load(streaming_path) # locate to Storage Account
# (2) Start Streaming
query = (
streaming_df
.writeStream
.foreachBatch(process_batch_for_streaming) # sink into Azure SQL Database
.trigger(once = True)
.option("checkpointLocation", checkpoint_path)
.start()
)
-- Streaming Notebook Snippet OFF ---
โ03-22-2022 09:33 AM
Hi @Markus Freischladโ ,
Have you try to run your streaming job with a different trigger interval? Just to isolate the issue. If you use DBR 10.1+ there is a new trigger that was introduced in this DBR version.
The new trigger is "trigger.AvailableNow". It is like Trigger.Once, which processes all available data then stops the query. However, Trigger.AvailableNow provides better scalability, because data can be processed in multiple batches instead of one.
โ02-10-2022 12:39 AM
Hi,
I have investigated a little bit more. Currently I think the ClusterLoadAvgHelper behaviour is not the main problem. This behaviour hides this one:
22/02/08 22:52:09 WARN TaskSetManager: Lost task 0.0 in stage 197663.0 (TID 4615729) (10.141.64.7 executor 410): com.microsoft.sqlserver.jdbc.SQLServerException: Connection timed out (Read failed)
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
at com.microsoft.sqlserver.jdbc.SimpleInputStream.getBytes(SimpleInputStream.java:352)
at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:796)
at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:3777)
at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:247)
at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:190)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2054)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2040)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2511)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12(JdbcUtils.scala:445)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12$adapted(JdbcUtils.scala:443)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:353)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:335)
I will do further checks into Azure SQL Database (stats, re-org run..) combined with sql-spark-connector. Maybe there is a conflict between streaming and stats/reorg.
Furthermore I will extend log4j config to get more details about
com.microsoft.sqlserver.jdbc.*
Thanks,
Markus
โ03-22-2022 09:33 AM
Hi @Markus Freischladโ ,
Have you try to run your streaming job with a different trigger interval? Just to isolate the issue. If you use DBR 10.1+ there is a new trigger that was introduced in this DBR version.
The new trigger is "trigger.AvailableNow". It is like Trigger.Once, which processes all available data then stops the query. However, Trigger.AvailableNow provides better scalability, because data can be processed in multiple batches instead of one.
โ02-10-2022 06:57 AM
@fsm - Thank you for the extra information! ๐
โ03-14-2022 06:24 PM
@Markus Freischladโ Looks like the spark driver was stuck. It will be good to capture the thread dump of the Spark driver to understand what operation is stuck
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group