cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Implementation of a stable Spark Structured Streaming Application

fsm
New Contributor II

Hi folks,

I have an issue. It's not critical but's annoying.

We have implemented a Spark Structured Streaming Application.

This application will be triggered wire Azure Data Factory (every 8 minutes). Ok, this setup sounds a little bit weird and it's not really Streaming, agreed. But the source system is not really real time and we would like to implement a Streaming POC, take a look into deep regarding the technics. That's all.

So, this "Streaming Notebook" runs 24/7. Mostly in a stable way. But sometimes one single load runs into a TimeoutException like this:

java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 26c3f28c-9d17-486a-81be-df418c42cd74, runId = d30a8fe8-3fed-4475-8233-4577b775bb19] failed to stop within 15000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.

This exception is clear. This load is trying to get access into the storage account, where checkpoint is located. Timeout occurs because another load has not finshed the work and locked the checkpoint, yet. In general this Streaming Application doesn't longer take than 1-2 minutes.

But some edge cases need more than 14-15 minutes and blocks other loads that will be started during this long run.

I did some investigation into driver logs and found a strange behavior into the log4j (see attached log4j_snippet.log, 99 sec. duration for what?).

In these edge cases I have a lot of entries like this.

ClusterLoadAvgHelper... what does it mean? Any ideas?

Like I sad, my own small business logic and connect to Azure SQL Database (as sink) does only take 1-2 minutes. In this edge cases where the whole processing time runs to 14-15 minutes more than 10 minutes are necessary for this ClusterLoadAvgHelper stuff.

Currently I have no idea why my cluster is running amok.

Like I sad at the beginning it's not critical, we don't miss any data into the SQL Database. But it's annoying :).

Any ideas would be great.

Thanks in advance,

Markus

Data Source: Auto Loader mechanism like this (https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html)

Sink: Azure SQL Database

Setup

Language: Python3

Databricks Runtime: 8.3 (includes Apache Spark 3.1.1, Scala 2.12)

Driver/Worker type: Standard_E4ds_v4

Cluster mode: Standard

Min Worker: 1 / Max Worker: 10

--- Streaming Notebook Snippet ON ---

# (1) Init Streaming Data Frame

streaming_df = spark.readStream.format("cloudFiles") \

   .option("cloudFiles.format", extension) \

   .schema(schema) \

   .load(streaming_path) # locate to Storage Account

# (2) Start Streaming

query = (

   streaming_df

   .writeStream

   .foreachBatch(process_batch_for_streaming) # sink into Azure SQL Database

   .trigger(once = True)

   .option("checkpointLocation", checkpoint_path)

   .start()

)

-- Streaming Notebook Snippet OFF ---

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @Markus Freischlad​ ,

Have you try to run your streaming job with a different trigger interval? Just to isolate the issue. If you use DBR 10.1+ there is a new trigger that was introduced in this DBR version.

The new trigger is "trigger.AvailableNow". It is like Trigger.Once, which processes all available data then stops the query. However, Trigger.AvailableNow provides better scalability, because data can be processed in multiple batches instead of one. 

docs here https://docs.databricks.com/release-notes/runtime/10.1.html#triggeravailablenow-for-delta-source-str...

View solution in original post

5 REPLIES 5

Kaniz
Community Manager
Community Manager

Hi @Markus Freischlad​ ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

fsm
New Contributor II

Hi,

I have investigated a little bit more. Currently I think the ClusterLoadAvgHelper behaviour is not the main problem. This behaviour hides this one:

22/02/08 22:52:09 WARN TaskSetManager: Lost task 0.0 in stage 197663.0 (TID 4615729) (10.141.64.7 executor 410): com.microsoft.sqlserver.jdbc.SQLServerException: Connection timed out (Read failed)

   at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)

   at com.microsoft.sqlserver.jdbc.SimpleInputStream.getBytes(SimpleInputStream.java:352)

   at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:796)

   at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:3777)

   at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:247)

   at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:190)

   at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2054)

   at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:2040)

   at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2511)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12(JdbcUtils.scala:445)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$12$adapted(JdbcUtils.scala:443)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:353)

   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:335)

I will do further checks into Azure SQL Database (stats, re-org run..) combined with sql-spark-connector. Maybe there is a conflict between streaming and stats/reorg.

Furthermore I will extend log4j config to get more details about

com.microsoft.sqlserver.jdbc.*

Thanks,

Markus

Hi @Markus Freischlad​ ,

Have you try to run your streaming job with a different trigger interval? Just to isolate the issue. If you use DBR 10.1+ there is a new trigger that was introduced in this DBR version.

The new trigger is "trigger.AvailableNow". It is like Trigger.Once, which processes all available data then stops the query. However, Trigger.AvailableNow provides better scalability, because data can be processed in multiple batches instead of one. 

docs here https://docs.databricks.com/release-notes/runtime/10.1.html#triggeravailablenow-for-delta-source-str...

Anonymous
Not applicable

@fsm - Thank you for the extra information! 🙂

User16869510359
Esteemed Contributor

@Markus Freischlad​  Looks like the spark driver was stuck. It will be good to capture the thread dump of the Spark driver to understand what operation is stuck

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.