<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: how to stop dataframe with federated table source to be reevaluated when referenced (cache?) in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/67021#M33260</link>
    <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/79106"&gt;@daniel_sahal&lt;/a&gt;&amp;nbsp;, this is the code snippet:&lt;/P&gt;&lt;P&gt;lsn_incr_batch = spark.sql(f"""&lt;BR /&gt;select start_lsn,tran_begin_time,tran_end_time,tran_id,tran_begin_lsn,cast('{current_run_ts}' as timestamp) as appended&lt;BR /&gt;from externaldb.cdc.lsn_time_mapping&lt;BR /&gt;where tran_end_time &amp;gt; '{batch_end_ts}'&lt;BR /&gt;""")&lt;/P&gt;&lt;P&gt;#lsn_incr_batch.cache()&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;lsn_incr_batch.write.&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"poc_catalog.cdc_source.lsn_time_mapping"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;table = "cdctest"&lt;/DIV&gt;&lt;DIV&gt;ext_table = "externaldb.cdc.dbo_" + table + "_CT"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;last_lsn = spark.sql(f"""&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;SELECT max_lsn&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;FROM poc_catalog.cdc_source.manifest&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;WHERE source_table_name = '{table}'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;ORDER BY appended desc&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;LIMIT 1&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;""")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;cdc_incr_batch = spark.sql(f"""&lt;/DIV&gt;&lt;DIV&gt;select ID,test1,test2,test3,`__$operation` as operation ,`__$start_lsn` as start_lsn,current_timestamp as appended&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;from {ext_table}&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;where `__$start_lsn` &amp;gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;select m.max_lsn&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from poc_catalog.cdc_source.manifest m&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;where m.source_table_name = '{table}'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;order by m.appended desc&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;limit 1&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;""")&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# filter out the records for which the change is not included in the lsn batch (yet)&lt;/DIV&gt;&lt;DIV&gt;lsn_incr_steady_batch = spark.sql(f"""&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;select * from poc_catalog.cdc_source.lsn_time_mapping&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;where appended = '{current_run_ts}'&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;""")&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;cdc_incr_batch_filtered = cdc_incr_batch.join(lsn_incr_steady_batch,["start_lsn"],"leftsemi")&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;As you can see in the snippet, I am now first writing the&amp;nbsp;&lt;SPAN&gt;lsn_incr_batch df to a delta table, so it remains an unchanged batch, but I would prefer to use the in memory dataframe in the semi-join to filter out the cdc records from an individual table instead of the table on disk...&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Tue, 23 Apr 2024 07:55:18 GMT</pubDate>
    <dc:creator>Anske</dc:creator>
    <dc:date>2024-04-23T07:55:18Z</dc:date>
    <item>
      <title>how to stop dataframe with federated table source to be reevaluated when referenced (cache?)</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/66943#M33241</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;Would anyone happen to know whether it's possible to cache a dataframe in memory that the result of a query on a federated table?&lt;/P&gt;&lt;P&gt;I have a notebook that queries a federated table, does some transformations on the dataframe and then writes this dataframe to a delta table. However, every time in the notebook when referencing the dataframe, the query gets 're- executed' on the federated source (sql server database), instead of operating on the dataframe in memory.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I would like to prevent this, such that the source is only queried once, and from there on, all dataframe functions operate on the dataframe in memory instead of on an updated dataset (without having to write to disk/delta table and then rereading it from disk/delta table again straight after). I'm processing batches of lsn_time_table, but the batch keeps changing every time I refer to the dataframe :-(.&lt;/P&gt;</description>
      <pubDate>Mon, 22 Apr 2024 14:40:43 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/66943#M33241</guid>
      <dc:creator>Anske</dc:creator>
      <dc:date>2024-04-22T14:40:43Z</dc:date>
    </item>
    <item>
      <title>Re: how to stop dataframe with federated table source to be reevaluated when referenced (cache?)</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/66961#M33245</link>
      <description>&lt;P&gt;Caching or persisting is the fastest option here. But there is a limitation with this. In case your dataset is big and cannot fit into memory, then this won't help and the spark will still refer the source data.&lt;/P&gt;
&lt;P&gt;If you need to avoid referring the source data completely, either checkpointing or writing data to file/table is a better option.&lt;/P&gt;</description>
      <pubDate>Mon, 22 Apr 2024 19:42:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/66961#M33245</guid>
      <dc:creator>Lakshay</dc:creator>
      <dc:date>2024-04-22T19:42:18Z</dc:date>
    </item>
    <item>
      <title>Re: how to stop dataframe with federated table source to be reevaluated when referenced (cache?)</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/66995#M33257</link>
      <description>&lt;P&gt;Thanks for your answer Lakshay. I have tried caching the df by using the cache() function, but it does not seem to do anything (the dataset in this case is tiny, so I'm pretty sure it would fit into memory). So I'm indeed back to writing to file first and going from there.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 23 Apr 2024 06:12:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/66995#M33257</guid>
      <dc:creator>Anske</dc:creator>
      <dc:date>2024-04-23T06:12:35Z</dc:date>
    </item>
    <item>
      <title>Re: how to stop dataframe with federated table source to be reevaluated when referenced (cache?)</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/67005#M33258</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/103531"&gt;@Anske&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;Could you paste a code snippet here?&lt;/P&gt;</description>
      <pubDate>Tue, 23 Apr 2024 06:56:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/67005#M33258</guid>
      <dc:creator>daniel_sahal</dc:creator>
      <dc:date>2024-04-23T06:56:36Z</dc:date>
    </item>
    <item>
      <title>Re: how to stop dataframe with federated table source to be reevaluated when referenced (cache?)</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/67021#M33260</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/79106"&gt;@daniel_sahal&lt;/a&gt;&amp;nbsp;, this is the code snippet:&lt;/P&gt;&lt;P&gt;lsn_incr_batch = spark.sql(f"""&lt;BR /&gt;select start_lsn,tran_begin_time,tran_end_time,tran_id,tran_begin_lsn,cast('{current_run_ts}' as timestamp) as appended&lt;BR /&gt;from externaldb.cdc.lsn_time_mapping&lt;BR /&gt;where tran_end_time &amp;gt; '{batch_end_ts}'&lt;BR /&gt;""")&lt;/P&gt;&lt;P&gt;#lsn_incr_batch.cache()&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;lsn_incr_batch.write.&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"poc_catalog.cdc_source.lsn_time_mapping"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;table = "cdctest"&lt;/DIV&gt;&lt;DIV&gt;ext_table = "externaldb.cdc.dbo_" + table + "_CT"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;last_lsn = spark.sql(f"""&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;SELECT max_lsn&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;FROM poc_catalog.cdc_source.manifest&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;WHERE source_table_name = '{table}'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;ORDER BY appended desc&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;LIMIT 1&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;""")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;cdc_incr_batch = spark.sql(f"""&lt;/DIV&gt;&lt;DIV&gt;select ID,test1,test2,test3,`__$operation` as operation ,`__$start_lsn` as start_lsn,current_timestamp as appended&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;from {ext_table}&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;where `__$start_lsn` &amp;gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;select m.max_lsn&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from poc_catalog.cdc_source.manifest m&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;where m.source_table_name = '{table}'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;order by m.appended desc&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;limit 1&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;""")&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# filter out the records for which the change is not included in the lsn batch (yet)&lt;/DIV&gt;&lt;DIV&gt;lsn_incr_steady_batch = spark.sql(f"""&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;select * from poc_catalog.cdc_source.lsn_time_mapping&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;where appended = '{current_run_ts}'&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;""")&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;cdc_incr_batch_filtered = cdc_incr_batch.join(lsn_incr_steady_batch,["start_lsn"],"leftsemi")&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;As you can see in the snippet, I am now first writing the&amp;nbsp;&lt;SPAN&gt;lsn_incr_batch df to a delta table, so it remains an unchanged batch, but I would prefer to use the in memory dataframe in the semi-join to filter out the cdc records from an individual table instead of the table on disk...&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Tue, 23 Apr 2024 07:55:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-stop-dataframe-with-federated-table-source-to-be/m-p/67021#M33260</guid>
      <dc:creator>Anske</dc:creator>
      <dc:date>2024-04-23T07:55:18Z</dc:date>
    </item>
  </channel>
</rss>

