cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Try catch multiple write streams on a job

AdamRink
New Contributor III

We are having issues with checkpoints and schema versions getting out of date (no idea why), but it causes jobs to fail. We have jobs that are running 15-30 streaming queries, so if one fails, that creates an issue. I would like to trap the checkpoint errors, and just reset the checkpoint and log a failure. Not optimal because then I'm reprocessing the stream or at least the window we are looking at.

So the problem I have is it seems the only way to error trap the stream is to use an awaitTermination()... this locks up notebook and the next streams won't start until the first stream is terminated. The awaitAnyTermination() won't catch when the job starts up and an error occurs because the job stops before hitting the awaitAnyTermination?

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @Adam Rink​ ,

Solution 1:-

Since you're calling awaitTermination on the first query it's going to block until it completes before starting the second query.

So if you want to kick off all queries, then use:-

StreamingQueryManager.awaitAnyTermination
val query1 = df.writeStream.start()
val query2 = df.writeStream.start()
val query3....................................
 
spark.streams.awaitAnyTermination()

In addition to the above, by default Spark uses the FIFO scheduler. This means the first query gets all resources in the cluster while it's executing. Since you're trying to run multiple queries concurrently you should switch to the FAIR scheduler.

If you have some queries that should have more resources than the others then you can also tune the individual scheduler pools.

Solution 2:-

val query1=ds.writeSteam.{...}.start()
 
val query2=ds.writeSteam.{...}.start()
 
val query3=ds.writeSteam.{...}.start()
 
query3.awaitTermination()

AwaitTermination() will block your process until finished, which will never happen in a streaming app, calling it on your last query should fix your problem.

Solution 3:-

If exit on any query, use:-

spark.streams.awaitAnyTermination()

If exit on all queries:-

Option 1:-

val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
val query3...........
query1.awaitTermination();
query2.awaitTermination();
query3................

Option 2:-

val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
spark.streams.active.foreach(x => x.awaitTermination())

Option 3:-

while (!spark.streams.active.isEmpty) {
 
  println("Queries currently still active: " + spark.streams.active.map(x => x.name).mkString(","))
 
  spark.streams.awaitAnyTermination()
 
  spark.streams.resetTerminated()
 
}

View solution in original post

5 REPLIES 5

Kaniz
Community Manager
Community Manager

Hi @Adam Rink​ ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

AdamRink
New Contributor III

Thx Kaniz

Kaniz
Community Manager
Community Manager

Hi @Adam Rink​ ,

Solution 1:-

Since you're calling awaitTermination on the first query it's going to block until it completes before starting the second query.

So if you want to kick off all queries, then use:-

StreamingQueryManager.awaitAnyTermination
val query1 = df.writeStream.start()
val query2 = df.writeStream.start()
val query3....................................
 
spark.streams.awaitAnyTermination()

In addition to the above, by default Spark uses the FIFO scheduler. This means the first query gets all resources in the cluster while it's executing. Since you're trying to run multiple queries concurrently you should switch to the FAIR scheduler.

If you have some queries that should have more resources than the others then you can also tune the individual scheduler pools.

Solution 2:-

val query1=ds.writeSteam.{...}.start()
 
val query2=ds.writeSteam.{...}.start()
 
val query3=ds.writeSteam.{...}.start()
 
query3.awaitTermination()

AwaitTermination() will block your process until finished, which will never happen in a streaming app, calling it on your last query should fix your problem.

Solution 3:-

If exit on any query, use:-

spark.streams.awaitAnyTermination()

If exit on all queries:-

Option 1:-

val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
val query3...........
query1.awaitTermination();
query2.awaitTermination();
query3................

Option 2:-

val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
spark.streams.active.foreach(x => x.awaitTermination())

Option 3:-

while (!spark.streams.active.isEmpty) {
 
  println("Queries currently still active: " + spark.streams.active.map(x => x.name).mkString(","))
 
  spark.streams.awaitAnyTermination()
 
  spark.streams.resetTerminated()
 
}

AdamRink
New Contributor III

The problem is that on startup if a stream fails, it would never hit the awaitAnyTermination? I almost want to take that while loop and put it on a background thread to start that at the beginning and then fire all the streams afterward... not sure if that is possible?

Hi @Adam Rink​ ,

Just a friendly follow-up. Do you still need help or @Kaniz Fatma​ response help to resolve your question?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.