cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Administration & Architecture
Explore discussions on Databricks administration, deployment strategies, and architectural best practices. Connect with administrators and architects to optimize your Databricks environment for performance, scalability, and security.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Stream Query termination using available now trigger and toTable.

Kutbuddin
New Contributor II

We are running a streaming job in databricks with custom streaming logic which consumes a CDC stream from mongo and appends to a delta table, at the end of the streaming job we have a internal checkpointing logic which creates an entry into a table with timestamp of query start. 
We are seeing diff in the checkpointing logic insertion time and the time where the query actually finishes.

What could be the reason for this?
Can a diff command taken up for execution before the streaming query finishes?

1 ACCEPTED SOLUTION

Accepted Solutions

raphaelblg
Databricks Employee
Databricks Employee

@Kutbuddin this is true, streaming queries are asynchronous and won't delay your code execution if not awaited for.

You can see in the example below that right after the query starts, the status is 'Initializing sources':

raphaelblg_0-1716922667104.png

But when using awaitTermination() the next row of code will only be executed after the query finishes:

raphaelblg_1-1716922758147.png

 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

View solution in original post

5 REPLIES 5

raphaelblg
Databricks Employee
Databricks Employee

Hello @Kutbuddin,

Can you share more details? Any source code example?

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

Kutbuddin
New Contributor II

Sharing code example:
1. Read stream from mongo
2. Write stream to delta table.
3. Insert into a internal checkpoint table.

# COMMAND 
query_cdc = (
spark.readStream.format("mongodb")
.option(
"spark.mongodb.connection.uri",
f"mongodb://{mongodb_user}:{mongodb_password}@{mongodb_host}/{source_database}.{source_collection}?authSource={mongodb_authsource}",
)
.option("spark.mongodb.database", source_database)
.option("spark.mongodb.collection", source_collection)
.option("spark.mongodb.change.stream.lookup.full.document", "updateLookup")
.option("spark.mongodb.read.aggregation.pipeline", "")
.schema(
{OurSchema}
)
.option("forceDeleteTempCheckpointLocation", "false")
.option("outputExtendedJson", "true")
.load()
)
# COMMAND
dt=int(datetime.datetime.strftime(datetime.datetime.now(),'%Y%m%d%H%M%S'))

query=(query_cdc.withColumn("ingestionTime", lit(dt)).writeStream \
.trigger(availableNow=True)
.partitionBy("ingestionTime")
.option("mergeSchema", "true")
.outputMode("append")
.option("checkpointLocation", target_bronze_checkpoint)
.toTable(f"{target_bronze_database}.{target_bronze_table}"));
 
 
# COMMAND
spark.sql(f"insert into table {internal_tab_name} values({dt})")

raphaelblg
Databricks Employee
Databricks Employee

@Kutbuddin What are you expecting to achieve? which variable is off and why?

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

Kutbuddin
New Contributor II

I was expecting spark.sql(f"insert into table {internal_tab_name} values({dt})") to execute at the end after the streaming query was written to the table. 

What I observed:
The spark sql query spark.sql(f"insert into table {internal_tab_name} values({dt})") for inserting to table was happening before the streaming query could complete. I thought awaitTermination wouldn't be required to be explicitly mentioned here.

raphaelblg
Databricks Employee
Databricks Employee

@Kutbuddin this is true, streaming queries are asynchronous and won't delay your code execution if not awaited for.

You can see in the example below that right after the query starts, the status is 'Initializing sources':

raphaelblg_0-1716922667104.png

But when using awaitTermination() the next row of code will only be executed after the query finishes:

raphaelblg_1-1716922758147.png

 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group