- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-06-2022 01:26 PM
Hi folks,
I have an issue. It's not critical but's annoying.
We have implemented a Spark Structured Streaming Application.
This application will be triggered wire Azure Data Factory (every 8 minutes). Ok, this setup sounds a little bit weird and it's not really Streaming, agreed. But the source system is not really real time and we would like to implement a Streaming POC, take a look into deep regarding the technics. That's all.
So, this "Streaming Notebook" runs 24/7. Mostly in a stable way. But sometimes one single load runs into a TimeoutException like this:
java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 26c3f28c-9d17-486a-81be-df418c42cd74, runId = d30a8fe8-3fed-4475-8233-4577b775bb19] failed to stop within 15000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.
This exception is clear. This load is trying to get access into the storage account, where checkpoint is located. Timeout occurs because another load has not finshed the work and locked the checkpoint, yet. In general this Streaming Application doesn't longer take than 1-2 minutes.
But some edge cases need more than 14-15 minutes and blocks other loads that will be started during this long run.
I did some investigation into driver logs and found a strange behavior into the log4j (see attached log4j_snippet.log, 99 sec. duration for what?).
In these edge cases I have a lot of entries like this.
ClusterLoadAvgHelper... what does it mean? Any ideas?
Like I sad, my own small business logic and connect to Azure SQL Database (as sink) does only take 1-2 minutes. In this edge cases where the whole processing time runs to 14-15 minutes more than 10 minutes are necessary for this ClusterLoadAvgHelper stuff.
Currently I have no idea why my cluster is running amok.
Like I sad at the beginning it's not critical, we don't miss any data into the SQL Database. But it's annoying :).
Any ideas would be great.
Thanks in advance,
Markus
Data Source: Auto Loader mechanism like this (https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html)
Sink: Azure SQL Database
Setup
Language: Python3
Databricks Runtime: 8.3 (includes Apache Spark 3.1.1, Scala 2.12)
Driver/Worker type: Standard_E4ds_v4
Cluster mode: Standard
Min Worker: 1 / Max Worker: 10
--- Streaming Notebook Snippet ON ---
# (1) Init Streaming Data Frame
streaming_df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", extension) \
.schema(schema) \
.load(streaming_path) # locate to Storage Account
# (2) Start Streaming
query = (
streaming_df
.writeStream
.foreachBatch(process_batch_for_streaming) # sink into Azure SQL Database
.trigger(once = True)
.option("checkpointLocation", checkpoint_path)
.start()
)
-- Streaming Notebook Snippet OFF ---
- Labels:
-
Databricks Runtime