cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

I want to run a streaming job from morning 6a.m to evening 5p.m how can I schedule this window in databricks. Or how can u stop my stream at 5pm?

Bhawna_bedi
New Contributor II
 
1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

You can use the CLI https://docs.databricks.com/dev-tools/cli/index.html to schedule a job and you can also schedule a cluster to terminate through API calls. There are also integrations with tools such as airflow and azure data factory.

View solution in original post

7 REPLIES 7

Anonymous
Not applicable

You can use the CLI https://docs.databricks.com/dev-tools/cli/index.html to schedule a job and you can also schedule a cluster to terminate through API calls. There are also integrations with tools such as airflow and azure data factory.

Kaniz
Community Manager
Community Manager

Hi @Bhawna Bediโ€‹ , This article discusses how to make your streaming application more fault tolerant using Databricks jobs.

merca
Valued Contributor II

You could set up schedule to start 6am and timeout seconds to 39 600 that is 11 hours. With max retries to 1. There is a downside to it - if your stream fails in the middle of day - it will run for 11 hours regardless when it stops.

Kaniz
Community Manager
Community Manager

Hi @Bhawna Bediโ€‹ โ€‹ , Just a friendly follow-up. Do you still need help, or did you find the solution? Please let us know.

Sandeep
Contributor III

If you are looking for a graceful stop (Not to stop exactly at 5 but stop after the micro-batch that was in progress at 5 o clock instead of abruptly stopping the stream), you can try the following. The downside is if the micro batch duration is high, stream stop will be delayed.

import java.time.LocalTime
val queryStopListner = new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = {
      
    }
    override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = {
      
    }
    override def onQueryProgress(queryProgress: StreamingQueryListener.QueryProgressEvent): Unit = {
      
      val id = queryProgress.progress.id
      if(LocalTime.now().isAfter(LocalTime.parse("17:00:00"))){
        val currentStreamingQuery = spark.streams.get(id)  
        currentStreamingQuery.stop
      }
    }
}
 
//Add this query listner to the session
spark.streams.addListener(queryStopListner)

DimaP
New Contributor II

Might anybody knows what will happen if I set task timeout in Workflows for the Streaming job?

merca
Valued Contributor II

If you are streaming to delta, not much, the micro batch will fail and in next time the stream will pick up from last successful write (due to ACID). I don't know about other formats, what happens if the stream is aborted in mid micro batch.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.