cancel
Showing results for 
Search instead for 
Did you mean: 
Administration & Architecture
Explore discussions on Databricks administration, deployment strategies, and architectural best practices. Connect with administrators and architects to optimize your Databricks environment for performance, scalability, and security.
cancel
Showing results for 
Search instead for 
Did you mean: 

Stream Query termination using available now trigger and toTable.

Kutbuddin
New Contributor II

We are running a streaming job in databricks with custom streaming logic which consumes a CDC stream from mongo and appends to a delta table, at the end of the streaming job we have a internal checkpointing logic which creates an entry into a table with timestamp of query start. 
We are seeing diff in the checkpointing logic insertion time and the time where the query actually finishes.

What could be the reason for this?
Can a diff command taken up for execution before the streaming query finishes?

1 ACCEPTED SOLUTION

Accepted Solutions

raphaelblg
Honored Contributor II
Honored Contributor II

@Kutbuddin this is true, streaming queries are asynchronous and won't delay your code execution if not awaited for.

You can see in the example below that right after the query starts, the status is 'Initializing sources':

raphaelblg_0-1716922667104.png

But when using awaitTermination() the next row of code will only be executed after the query finishes:

raphaelblg_1-1716922758147.png

 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

View solution in original post

5 REPLIES 5

raphaelblg
Honored Contributor II
Honored Contributor II

Hello @Kutbuddin,

Can you share more details? Any source code example?

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

Kutbuddin
New Contributor II

Sharing code example:
1. Read stream from mongo
2. Write stream to delta table.
3. Insert into a internal checkpoint table.

# COMMAND 
query_cdc = (
spark.readStream.format("mongodb")
.option(
"spark.mongodb.connection.uri",
f"mongodb://{mongodb_user}:{mongodb_password}@{mongodb_host}/{source_database}.{source_collection}?authSource={mongodb_authsource}",
)
.option("spark.mongodb.database", source_database)
.option("spark.mongodb.collection", source_collection)
.option("spark.mongodb.change.stream.lookup.full.document", "updateLookup")
.option("spark.mongodb.read.aggregation.pipeline", "")
.schema(
{OurSchema}
)
.option("forceDeleteTempCheckpointLocation", "false")
.option("outputExtendedJson", "true")
.load()
)
# COMMAND
dt=int(datetime.datetime.strftime(datetime.datetime.now(),'%Y%m%d%H%M%S'))

query=(query_cdc.withColumn("ingestionTime", lit(dt)).writeStream \
.trigger(availableNow=True)
.partitionBy("ingestionTime")
.option("mergeSchema", "true")
.outputMode("append")
.option("checkpointLocation", target_bronze_checkpoint)
.toTable(f"{target_bronze_database}.{target_bronze_table}"));
 
 
# COMMAND
spark.sql(f"insert into table {internal_tab_name} values({dt})")

raphaelblg
Honored Contributor II
Honored Contributor II

@Kutbuddin What are you expecting to achieve? which variable is off and why?

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

Kutbuddin
New Contributor II

I was expecting spark.sql(f"insert into table {internal_tab_name} values({dt})") to execute at the end after the streaming query was written to the table. 

What I observed:
The spark sql query spark.sql(f"insert into table {internal_tab_name} values({dt})") for inserting to table was happening before the streaming query could complete. I thought awaitTermination wouldn't be required to be explicitly mentioned here.

raphaelblg
Honored Contributor II
Honored Contributor II

@Kutbuddin this is true, streaming queries are asynchronous and won't delay your code execution if not awaited for.

You can see in the example below that right after the query starts, the status is 'Initializing sources':

raphaelblg_0-1716922667104.png

But when using awaitTermination() the next row of code will only be executed after the query finishes:

raphaelblg_1-1716922758147.png

 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!