cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Implementation of a stable Spark Structured Streaming Application

fsm
New Contributor II

Hi folks,

I have an issue. It's not critical but's annoying.

We have implemented a Spark Structured Streaming Application.

This application will be triggered wire Azure Data Factory (every 8 minutes). Ok, this setup sounds a little bit weird and it's not really Streaming, agreed. But the source system is not really real time and we would like to implement a Streaming POC, take a look into deep regarding the technics. That's all.

So, this "Streaming Notebook" runs 24/7. Mostly in a stable way. But sometimes one single load runs into a TimeoutException like this:

java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 26c3f28c-9d17-486a-81be-df418c42cd74, runId = d30a8fe8-3fed-4475-8233-4577b775bb19] failed to stop within 15000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.

This exception is clear. This load is trying to get access into the storage account, where checkpoint is located. Timeout occurs because another load has not finshed the work and locked the checkpoint, yet. In general this Streaming Application doesn't longer take than 1-2 minutes.

But some edge cases need more than 14-15 minutes and blocks other loads that will be started during this long run.

I did some investigation into driver logs and found a strange behavior into the log4j (see attached log4j_snippet.log, 99 sec. duration for what?).

In these edge cases I have a lot of entries like this.

ClusterLoadAvgHelper... what does it mean? Any ideas?

Like I sad, my own small business logic and connect to Azure SQL Database (as sink) does only take 1-2 minutes. In this edge cases where the whole processing time runs to 14-15 minutes more than 10 minutes are necessary for this ClusterLoadAvgHelper stuff.

Currently I have no idea why my cluster is running amok.

Like I sad at the beginning it's not critical, we don't miss any data into the SQL Database. But it's annoying :).

Any ideas would be great.

Thanks in advance,

Markus

Data Source: Auto Loader mechanism like this (https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html)

Sink: Azure SQL Database

Setup

Language: Python3

Databricks Runtime: 8.3 (includes Apache Spark 3.1.1, Scala 2.12)

Driver/Worker type: Standard_E4ds_v4

Cluster mode: Standard

Min Worker: 1 / Max Worker: 10

--- Streaming Notebook Snippet ON ---

# (1) Init Streaming Data Frame

streaming_df = spark.readStream.format("cloudFiles") \

   .option("cloudFiles.format", extension) \

   .schema(schema) \

   .load(streaming_path) # locate to Storage Account

# (2) Start Streaming

query = (

   streaming_df

   .writeStream

   .foreachBatch(process_batch_for_streaming) # sink into Azure SQL Database

   .trigger(once = True)

   .option("checkpointLocation", checkpoint_path)

   .start()

)

-- Streaming Notebook Snippet OFF ---

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @Markus Freischladโ€‹ ,

Have you try to run your streaming job with a different trigger interval? Just to isolate the issue. If you use DBR 10.1+ there is a new trigger that was introduced in this DBR version.

The new trigger is "trigger.AvailableNow". It is like Trigger.Once, which processes all available data then stops the query. However, Trigger.AvailableNow provides better scalability, because data can be processed in multiple batches instead of one. 

docs here https://docs.databricks.com/release-notes/runtime/10.1.html#triggeravailablenow-for-delta-source-str...

View solution in original post

4 REPLIES 4

fsm
New Contributor II

Hi,

I have investigated a little bit more. Currently I think the ClusterLoadAvgHelper behaviour is not the main problem. This behaviour hides this one:

22/02/08 22:52:09 WARN TaskSetManager: Lost task 0.0 in stage 197663.0 (TID 4615729) (10.141.64.7 executor 410): com.microsoft.sqlserver.jdbc.SQLServerException: Connection timed out (Read failed)

   at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)

   at com.microsoft.sqlserver.jdbc.SimpleInputStream.getBytes(SimpleInputStream.java:352)

   at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:796)

   at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:3777)

   at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:247)

   at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:190)

   at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2054)

   at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2040)

   at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2511)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12(JdbcUtils.scala:445)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12$adapted(JdbcUtils.scala:443)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:353)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:335)

I will do further checks into Azure SQL Database (stats, re-org run..) combined with sql-spark-connector. Maybe there is a conflict between streaming and stats/reorg.

Furthermore I will extend log4j config to get more details about

com.microsoft.sqlserver.jdbc.*

Thanks,

Markus

Hi @Markus Freischladโ€‹ ,

Have you try to run your streaming job with a different trigger interval? Just to isolate the issue. If you use DBR 10.1+ there is a new trigger that was introduced in this DBR version.

The new trigger is "trigger.AvailableNow". It is like Trigger.Once, which processes all available data then stops the query. However, Trigger.AvailableNow provides better scalability, because data can be processed in multiple batches instead of one. 

docs here https://docs.databricks.com/release-notes/runtime/10.1.html#triggeravailablenow-for-delta-source-str...

Anonymous
Not applicable

@fsm - Thank you for the extra information! ๐Ÿ™‚

brickster_2018
Esteemed Contributor

@Markus Freischladโ€‹  Looks like the spark driver was stuck. It will be good to capture the thread dump of the Spark driver to understand what operation is stuck

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group