<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Stream Query termination  using available now trigger and toTable. in Administration &amp; Architecture</title>
    <link>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70839#M1227</link>
    <description>&lt;P&gt;Sharing code example:&lt;BR /&gt;1. Read stream from mongo&lt;BR /&gt;2. Write stream to delta table.&lt;BR /&gt;3. Insert into a internal checkpoint table.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;# COMMAND&amp;nbsp;&lt;BR /&gt;query_cdc &lt;SPAN&gt;= &lt;/SPAN&gt;(&lt;BR /&gt;spark.readStream.&lt;SPAN&gt;format&lt;/SPAN&gt;(&lt;SPAN&gt;"mongodb"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;BR /&gt;&lt;SPAN&gt;"spark.mongodb.connection.uri"&lt;/SPAN&gt;,&lt;BR /&gt;&lt;SPAN&gt;f"mongodb://&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_user&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_password&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_host&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;/&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;source_database&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;source_collection&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;?authSource=&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_authsource&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;,&lt;BR /&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.database"&lt;/SPAN&gt;, source_database)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.collection"&lt;/SPAN&gt;, source_collection)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.change.stream.lookup.full.document"&lt;/SPAN&gt;, &lt;SPAN&gt;"updateLookup"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.read.aggregation.pipeline"&lt;/SPAN&gt;, &lt;SPAN&gt;""&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;schema&lt;/SPAN&gt;(&lt;BR /&gt;{OurSchema}&lt;BR /&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"forceDeleteTempCheckpointLocation"&lt;/SPAN&gt;, &lt;SPAN&gt;"false"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"outputExtendedJson"&lt;/SPAN&gt;, &lt;SPAN&gt;"true"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;load&lt;/SPAN&gt;()&lt;BR /&gt;)&lt;BR /&gt;# COMMAND&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;dt&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;int&lt;/SPAN&gt;(datetime.datetime.&lt;SPAN&gt;strftime&lt;/SPAN&gt;(datetime.datetime.&lt;SPAN&gt;now&lt;/SPAN&gt;(),&lt;SPAN&gt;'%Y%m%d%H%M%S'&lt;/SPAN&gt;))&lt;BR /&gt;&lt;BR /&gt;query&lt;SPAN&gt;=&lt;/SPAN&gt;(query_cdc.&lt;SPAN&gt;withColumn&lt;/SPAN&gt;(&lt;SPAN&gt;"ingestionTime"&lt;/SPAN&gt;, &lt;SPAN&gt;lit&lt;/SPAN&gt;(dt)).writeStream \&lt;BR /&gt;.&lt;SPAN&gt;trigger&lt;/SPAN&gt;(&lt;SPAN&gt;availableNow&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;partitionBy&lt;/SPAN&gt;(&lt;SPAN&gt;"ingestionTime"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;, &lt;SPAN&gt;"true"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;outputMode&lt;/SPAN&gt;(&lt;SPAN&gt;"append"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;, target_bronze_checkpoint)&lt;BR /&gt;.&lt;SPAN&gt;toTable&lt;/SPAN&gt;(&lt;SPAN&gt;f"&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;target_bronze_database&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;target_bronze_table&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;));&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND&lt;BR /&gt;&lt;DIV&gt;spark.&lt;SPAN&gt;sql&lt;/SPAN&gt;(&lt;SPAN&gt;f"insert into table &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;internal_tab_name&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt; values(&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;dt&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;)"&lt;/SPAN&gt;)&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Tue, 28 May 2024 03:10:39 GMT</pubDate>
    <dc:creator>Kutbuddin</dc:creator>
    <dc:date>2024-05-28T03:10:39Z</dc:date>
    <item>
      <title>Stream Query termination  using available now trigger and toTable.</title>
      <link>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70672#M1223</link>
      <description>&lt;P&gt;We are running a streaming job in databricks with custom streaming logic which consumes a CDC stream from mongo and appends to a delta table, at the end of the streaming job we have a internal checkpointing logic which creates an entry into a table with timestamp of query start.&amp;nbsp;&lt;BR /&gt;We are seeing diff in the checkpointing logic insertion time and the time where the query actually finishes.&lt;BR /&gt;&lt;BR /&gt;What could be the reason for this?&lt;BR /&gt;Can a diff command taken up for execution before the streaming query finishes?&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 07:04:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70672#M1223</guid>
      <dc:creator>Kutbuddin</dc:creator>
      <dc:date>2024-05-27T07:04:51Z</dc:date>
    </item>
    <item>
      <title>Re: Stream Query termination  using available now trigger and toTable.</title>
      <link>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70803#M1226</link>
      <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/105771"&gt;@Kutbuddin&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Can you share more details? Any source code example?&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 19:32:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70803#M1226</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-05-27T19:32:09Z</dc:date>
    </item>
    <item>
      <title>Re: Stream Query termination  using available now trigger and toTable.</title>
      <link>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70839#M1227</link>
      <description>&lt;P&gt;Sharing code example:&lt;BR /&gt;1. Read stream from mongo&lt;BR /&gt;2. Write stream to delta table.&lt;BR /&gt;3. Insert into a internal checkpoint table.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;# COMMAND&amp;nbsp;&lt;BR /&gt;query_cdc &lt;SPAN&gt;= &lt;/SPAN&gt;(&lt;BR /&gt;spark.readStream.&lt;SPAN&gt;format&lt;/SPAN&gt;(&lt;SPAN&gt;"mongodb"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;BR /&gt;&lt;SPAN&gt;"spark.mongodb.connection.uri"&lt;/SPAN&gt;,&lt;BR /&gt;&lt;SPAN&gt;f"mongodb://&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_user&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_password&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_host&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;/&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;source_database&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;source_collection&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;?authSource=&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;mongodb_authsource&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;,&lt;BR /&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.database"&lt;/SPAN&gt;, source_database)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.collection"&lt;/SPAN&gt;, source_collection)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.change.stream.lookup.full.document"&lt;/SPAN&gt;, &lt;SPAN&gt;"updateLookup"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"spark.mongodb.read.aggregation.pipeline"&lt;/SPAN&gt;, &lt;SPAN&gt;""&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;schema&lt;/SPAN&gt;(&lt;BR /&gt;{OurSchema}&lt;BR /&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"forceDeleteTempCheckpointLocation"&lt;/SPAN&gt;, &lt;SPAN&gt;"false"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"outputExtendedJson"&lt;/SPAN&gt;, &lt;SPAN&gt;"true"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;load&lt;/SPAN&gt;()&lt;BR /&gt;)&lt;BR /&gt;# COMMAND&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;dt&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;int&lt;/SPAN&gt;(datetime.datetime.&lt;SPAN&gt;strftime&lt;/SPAN&gt;(datetime.datetime.&lt;SPAN&gt;now&lt;/SPAN&gt;(),&lt;SPAN&gt;'%Y%m%d%H%M%S'&lt;/SPAN&gt;))&lt;BR /&gt;&lt;BR /&gt;query&lt;SPAN&gt;=&lt;/SPAN&gt;(query_cdc.&lt;SPAN&gt;withColumn&lt;/SPAN&gt;(&lt;SPAN&gt;"ingestionTime"&lt;/SPAN&gt;, &lt;SPAN&gt;lit&lt;/SPAN&gt;(dt)).writeStream \&lt;BR /&gt;.&lt;SPAN&gt;trigger&lt;/SPAN&gt;(&lt;SPAN&gt;availableNow&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;partitionBy&lt;/SPAN&gt;(&lt;SPAN&gt;"ingestionTime"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;, &lt;SPAN&gt;"true"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;outputMode&lt;/SPAN&gt;(&lt;SPAN&gt;"append"&lt;/SPAN&gt;)&lt;BR /&gt;.&lt;SPAN&gt;option&lt;/SPAN&gt;(&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;, target_bronze_checkpoint)&lt;BR /&gt;.&lt;SPAN&gt;toTable&lt;/SPAN&gt;(&lt;SPAN&gt;f"&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;target_bronze_database&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;target_bronze_table&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;));&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND&lt;BR /&gt;&lt;DIV&gt;spark.&lt;SPAN&gt;sql&lt;/SPAN&gt;(&lt;SPAN&gt;f"insert into table &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;internal_tab_name&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt; values(&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;dt&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;)"&lt;/SPAN&gt;)&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Tue, 28 May 2024 03:10:39 GMT</pubDate>
      <guid>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70839#M1227</guid>
      <dc:creator>Kutbuddin</dc:creator>
      <dc:date>2024-05-28T03:10:39Z</dc:date>
    </item>
    <item>
      <title>Re: Stream Query termination  using available now trigger and toTable.</title>
      <link>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70951#M1230</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/105771"&gt;@Kutbuddin&lt;/a&gt;&amp;nbsp;What are you expecting to achieve? which variable is off and why?&lt;/P&gt;</description>
      <pubDate>Tue, 28 May 2024 18:20:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70951#M1230</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-05-28T18:20:55Z</dc:date>
    </item>
    <item>
      <title>Re: Stream Query termination  using available now trigger and toTable.</title>
      <link>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70955#M1231</link>
      <description>&lt;P&gt;I was expecting&amp;nbsp;&lt;SPAN&gt;spark.&lt;/SPAN&gt;&lt;SPAN&gt;sql&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f"insert into table&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;internal_tab_name&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;values(&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;dt&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;)"&lt;/SPAN&gt;&lt;SPAN&gt;) to execute at the end after the streaming query was written to the table.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;What I observed:&lt;BR /&gt;The spark sql query&amp;nbsp;spark.sql(f"insert into table&amp;nbsp;{internal_tab_name}&amp;nbsp;values({dt})") for inserting to table was happening before the streaming query could complete. I thought awaitTermination wouldn't be required to be explicitly mentioned here.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 28 May 2024 18:28:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70955#M1231</guid>
      <dc:creator>Kutbuddin</dc:creator>
      <dc:date>2024-05-28T18:28:22Z</dc:date>
    </item>
    <item>
      <title>Re: Stream Query termination  using available now trigger and toTable.</title>
      <link>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70961#M1232</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/105771"&gt;@Kutbuddin&lt;/a&gt;&amp;nbsp;this is true, streaming queries are asynchronous and won't delay your code execution if not awaited for.&lt;BR /&gt;&lt;BR /&gt;You can see in the example below that right after the query starts, the status is&lt;STRONG&gt;&amp;nbsp;'Initializing sources'&lt;/STRONG&gt;:&lt;/P&gt;
&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="raphaelblg_0-1716922667104.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/7917i8BD263A4867D2138/image-size/medium?v=v2&amp;amp;px=400" role="button" title="raphaelblg_0-1716922667104.png" alt="raphaelblg_0-1716922667104.png" /&gt;&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;But when using&amp;nbsp;&lt;STRONG&gt;&lt;SPAN&gt;awaitTermination()&amp;nbsp;&lt;/SPAN&gt;&lt;/STRONG&gt;&lt;SPAN&gt;the next row of code will only be executed after the query finishes:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="raphaelblg_1-1716922758147.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/7918i1B7B6050933226D0/image-size/medium?v=v2&amp;amp;px=400" role="button" title="raphaelblg_1-1716922758147.png" alt="raphaelblg_1-1716922758147.png" /&gt;&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 28 May 2024 18:59:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/administration-architecture/stream-query-termination-using-available-now-trigger-and-totable/m-p/70961#M1232</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-05-28T18:59:54Z</dc:date>
    </item>
  </channel>
</rss>

