cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Graceful shutdown - stopping stream at the end of microbatch

itt
New Contributor II

Im trying to create a system where i let spark finish the current microbatch, and letting it know it should stop after it.

The reason is that i don't want to re-calcualte a microbatch with "forcefully" stopping a stream.

Is there a way spark/databricks already implemented this?

My current approach is just raising exception at the start of the microbatch, if an indication of gracefull stop was given.

Thanks

3 REPLIES 3

itt
New Contributor II

Hey, thanks for the replay

It still seems to cut at the middle of the stream.

For a simple test, i created the following:

onend = -1
on_entry = -1
def foreachbatch(df, df_id๐Ÿ˜ž
    global onend
    global on_entry
    on_entry = df_id
    time.sleep(5)
    df.count()
    dff = spark.table("some_table").limit(1)
    dff.write.insertInto("some_table", overwrite=False)
    onend = df_id

spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

stream_reader = (
    spark.readStream.format("delta").option("ignoreChanges", "true").option("spark.streaming.stopGracefullyOnShutdown", "true")
)
stream_reader = stream_reader.table("some_table")

writer = stream_reader.writeStream.trigger(
    processingTime="1 seconds"
).option("maxBytesPerTrigger", 1).queryName("thestream")

writer = writer.outputMode("append").format("delta").foreachBatch(foreachbatch)

query = writer.start()
 
 
and to stop i do:
query.stop()
 
 
And the values are not changing for "onend"
of curse this is just for testing, but still it seems like it does not work
 

NandiniN
Databricks Employee
Databricks Employee

Checking.

mark_ott
Databricks Employee
Databricks Employee

There is no built-in Spark or Databricks method to gracefully stop a Structured Streaming query specifically at the end of the current microbatch, but several community and expert discussions propose common strategies to achieve this:

Official and Common Approaches

  • The standard StreamingQuery.stop() method stops the execution thread and cancels jobs, but it does not guarantee waiting for the current microbatch to finishโ€”all jobs are cancelled at once, so in-flight microbatch processing may be interrupted and could result in partial data or the need for recalculation upon restart.โ€‹

  • For traditional Spark Streaming (DStreams, not Structured Streaming), there is a graceful stop option (StreamingContext.stop(stopSparkContext, stopGracefully)), which waits for processing of all received data. However, this is not available for Structured Streaming.โ€‹

Community Solutions

  • A common workaround is to signal for a graceful shutdown (e.g., by creating a marker file, listening on a socket, etc.) and then wait for the current trigger (microbatch) to complete before calling stop() on the StreamingQuery object. This should be done outside the microbatch processing code to avoid deadlocks, as stopping the query inside the microbatch can cause it to never finish.โ€‹

  • Another method is to periodically check if the stream's status (StreamingQuery.status.isTriggerActive) is falseโ€”which would indicate that the microbatch has completedโ€”and only then call .stop() when an external termination signal is received.โ€‹

Example Approach

  • Use an external mechanism (file, API, REST endpoint) to signal when a graceful stop is required.

  • Monitor the status of the streaming query in a separate thread or loop.

  • After receiving the stop signal, wait until the current microbatch has finished (isTriggerActive == false), then call .stop().โ€‹

Why Not Raise Exceptions

  • Raising exceptions inside the microbatch task can interrupt it mid-processing, risking partial computation or data duplication/loss, which is why most best practices recommend monitoring and stopping externally.โ€‹

Summary Table: Structured Streaming Stop Methods

Method Waits for Microbatch End Risk of Recomputing Recommended Usage
StreamingQuery.stop() No Yes Only for hard stop
External status monitoring Yes (if trigger ended) No Graceful shutdown signal
Exception in microbatch code No Yes Not recommended
 
 

Consult the Spark or Databricks documentation for updates, as enhancements may eventually address this scenario natively. For immediate best results, use external coordination and status monitoring as described above.โ€‹