<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Autoloader includeExistingFiles with retry didn't update the schema in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76533#M35248</link>
    <description>&lt;DIV&gt;&lt;SPAN&gt;Below is the sample one&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;batch_operation&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;df&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"batch_id: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; src_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; filtr_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; src_table_df \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'load_ctl_key'&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;(aws_lck)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; src_columns &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df.columns&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# checksum text column is generated and added to the DataFrame.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; if condition&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; df2 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;create_chk_sum_txt&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;df&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;df, &lt;/SPAN&gt;&lt;SPAN&gt;cols&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;col_info, &lt;/SPAN&gt;&lt;SPAN&gt;exc_cols&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;exclude_columns)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;else&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; df2 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df3 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df2&lt;/SPAN&gt;&lt;SPAN&gt;.filter&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;id&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNull&lt;/SPAN&gt;&lt;SPAN&gt;() &lt;/SPAN&gt;&lt;SPAN&gt;|&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;length&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"id"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;50&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;|&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;length&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cd"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;10&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;|&lt;/SPAN&gt;&lt;SPAN&gt; \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"5yr"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNotNull&lt;/SPAN&gt;&lt;SPAN&gt;() &lt;/SPAN&gt;&lt;SPAN&gt;&amp;amp;&lt;/SPAN&gt; &lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"5yr"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;cast&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"int"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNull&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# &amp;nbsp; &amp;nbsp;columns=df3.columns&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"df3 completed"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df4 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df3.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"error_reason"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;when&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"id"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNull&lt;/SPAN&gt;&lt;SPAN&gt;(), &lt;/SPAN&gt;&lt;SPAN&gt;"id is Null"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;when&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;length&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"id"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;50&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"id exceeds column size in base table"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;otherwise&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Miscellaneous Error"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# &amp;nbsp; Filter valid records to be loaded to Base Table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df5 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df2.&lt;/SPAN&gt;&lt;SPAN&gt;exceptAll&lt;/SPAN&gt;&lt;SPAN&gt;(df3).&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"error_reason"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;""&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; ins_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df5.&lt;/SPAN&gt;&lt;SPAN&gt;count&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; filtr_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; src_cnt &lt;/SPAN&gt;&lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt; ins_cnt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'src_cnt: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(src_cnt))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'ins_cnt: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(ins_cnt))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'filtr_cnt: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(filtr_cnt))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; select_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df5.&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;*&lt;/SPAN&gt;&lt;SPAN&gt;col_info)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"select_df completed"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; err_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df4.&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;*&lt;/SPAN&gt;&lt;SPAN&gt;col_info)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"err_df completed"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Final Schema before writing to UC:"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; select_df.&lt;/SPAN&gt;&lt;SPAN&gt;printSchema&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; err_df.&lt;/SPAN&gt;&lt;SPAN&gt;printSchema&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; transformed_df.write&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"catalog&lt;/SPAN&gt;&lt;SPAN&gt;.schema.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;trgt_tbl&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; err_df.write&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(f"catalog.schema.{err_tbl}"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;if&lt;/SPAN&gt; &lt;SPAN&gt;int&lt;/SPAN&gt;&lt;SPAN&gt;(load_ctl_key) &lt;/SPAN&gt;&lt;SPAN&gt;!=&lt;/SPAN&gt; &lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Pushing metrics to UC"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; payload &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"load_ctl_key"&lt;/SPAN&gt;&lt;SPAN&gt;: load_ctl_key,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"job_id"&lt;/SPAN&gt;&lt;SPAN&gt;: job_id,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"batch_id"&lt;/SPAN&gt;&lt;SPAN&gt;: batch_id,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"table"&lt;/SPAN&gt;&lt;SPAN&gt;: trgt_tbl,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"src_cnt"&lt;/SPAN&gt;&lt;SPAN&gt;: src_cnt,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"filter_cnt"&lt;/SPAN&gt;&lt;SPAN&gt;: filtr_cnt,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"ins_cnt"&lt;/SPAN&gt;&lt;SPAN&gt;: ins_cnt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Payload: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(payload))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; payload_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.read.&lt;/SPAN&gt;&lt;SPAN&gt;json&lt;/SPAN&gt;&lt;SPAN&gt;(sc.&lt;/SPAN&gt;&lt;SPAN&gt;parallelize&lt;/SPAN&gt;&lt;SPAN&gt;([payload]))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;display&lt;/SPAN&gt;&lt;SPAN&gt;(payload_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; payload_df.write.&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"catalog.schema&lt;/SPAN&gt;&lt;SPAN&gt;.audit"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;</description>
    <pubDate>Tue, 02 Jul 2024 16:17:31 GMT</pubDate>
    <dc:creator>ajithgaade</dc:creator>
    <dc:date>2024-07-02T16:17:31Z</dc:date>
    <item>
      <title>Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76421#M35211</link>
      <description>&lt;P&gt;Hi,&lt;BR /&gt;written in pyspark.&lt;/P&gt;&lt;P&gt;databricks autoloader job with retry didn't merge/update the schema.&lt;/P&gt;&lt;P&gt;spark.readStream.format("cloudFiles")&lt;/P&gt;&lt;P&gt;.option("cloudFiles.format", "parquet")&lt;/P&gt;&lt;P&gt;.option("cloudFiles.schemaLocation", checkpoint_path)&lt;/P&gt;&lt;P&gt;.option("cloudFiles.includeExistingFiles", "false"&lt;/P&gt;&lt;P&gt;.load(source_path)&lt;/P&gt;&lt;P&gt;.writeStream&lt;/P&gt;&lt;P&gt;.queryName("write stream query")&lt;/P&gt;&lt;P&gt;.option("checkpointLocation", checkpoint_path)&lt;/P&gt;&lt;P&gt;.trigger(availableNow=True)&lt;/P&gt;&lt;P&gt;.forEachBatch(batch_operation)&lt;/P&gt;&lt;P&gt;.option("mergeSchema", True)&lt;/P&gt;&lt;P&gt;.start()&lt;/P&gt;&lt;P&gt;.awaitTermination()&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Error:&amp;nbsp;&lt;/P&gt;&lt;P&gt;Error while reading file s3://path&lt;BR /&gt;Caused by: UnknownFieldException: [UNKNOWN_FILED_EXCEPTION.NEW_FILEDS_IN_FILE] Encountered unknown fields during parsing: filed type, which can be fixed by an automatic retry: false&lt;/P&gt;&lt;P&gt;tried running couple of times. set retry = 2 for job and task as well.&lt;/P&gt;&lt;P&gt;please can you help?&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jul 2024 03:38:41 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76421#M35211</guid>
      <dc:creator>ajithgaade</dc:creator>
      <dc:date>2024-07-02T03:38:41Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76434#M35213</link>
      <description>&lt;P&gt;What happens if you enable rescue mode?&lt;/P&gt;&lt;LI-CODE lang="python"&gt;.option('cloudFiles.schemaEvolutionMode', 'rescue')&lt;/LI-CODE&gt;</description>
      <pubDate>Tue, 02 Jul 2024 06:17:40 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76434#M35213</guid>
      <dc:creator>Witold</dc:creator>
      <dc:date>2024-07-02T06:17:40Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76438#M35214</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/106731"&gt;@ajithgaade&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;
&lt;P&gt;If you are using a merge statement inside the forEachBatch function&amp;nbsp;&lt;SPAN&gt;batch_operation then you have to use DBR 15.2 and above to evolve the schema&lt;BR /&gt;&lt;A href="https://docs.databricks.com/en/delta/update-schema.html#automatic-schema-evolution-for-delta-lake-merge" target="_blank"&gt;https://docs.databricks.com/en/delta/update-schema.html#automatic-schema-evolution-for-delta-lake-merge&lt;/A&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jul 2024 06:22:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76438#M35214</guid>
      <dc:creator>Giri-Patcham</dc:creator>
      <dc:date>2024-07-02T06:22:04Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76513#M35237</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/107959"&gt;@Witold&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;rescue option won't evolve the schema.&lt;BR /&gt;&lt;A href="https://docs.databricks.com/en/ingestion/auto-loader/schema.html#:~:text=evolve%20data%20types.-,rescue,-Schema%20is%20never" target="_blank" rel="noopener"&gt;https://docs.databricks.com/en/ingestion/auto-loader/schema.html#:~:text=evolve%20data%20types.-,rescue,-Schema%20is%20never&lt;/A&gt;&lt;BR /&gt;&lt;BR /&gt;my requirement is schema should evolve automatically&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jul 2024 14:12:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76513#M35237</guid>
      <dc:creator>ajithgaade</dc:creator>
      <dc:date>2024-07-02T14:12:09Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76516#M35240</link>
      <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/16818"&gt;@Giri-Patcham&lt;/a&gt;&lt;BR /&gt;batch operation doesn't has merge statement. I am dropping tables and recreating. Tried clearing checkpoint location many times and different options.&amp;nbsp;Tried with DBR 15.3, No Luck.&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jul 2024 13:53:29 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76516#M35240</guid>
      <dc:creator>ajithgaade</dc:creator>
      <dc:date>2024-07-02T13:53:29Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76527#M35245</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/106731"&gt;@ajithgaade&lt;/a&gt;&amp;nbsp;can you share the sample code inside the batch function ?&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jul 2024 15:10:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76527#M35245</guid>
      <dc:creator>Giri-Patcham</dc:creator>
      <dc:date>2024-07-02T15:10:42Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76533#M35248</link>
      <description>&lt;DIV&gt;&lt;SPAN&gt;Below is the sample one&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;batch_operation&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;df&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"batch_id: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; src_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; filtr_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; src_table_df \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'load_ctl_key'&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;(aws_lck)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; src_columns &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df.columns&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# checksum text column is generated and added to the DataFrame.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; if condition&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; df2 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;create_chk_sum_txt&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;df&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;df, &lt;/SPAN&gt;&lt;SPAN&gt;cols&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;col_info, &lt;/SPAN&gt;&lt;SPAN&gt;exc_cols&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;exclude_columns)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;else&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; df2 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df3 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df2&lt;/SPAN&gt;&lt;SPAN&gt;.filter&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;id&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNull&lt;/SPAN&gt;&lt;SPAN&gt;() &lt;/SPAN&gt;&lt;SPAN&gt;|&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;length&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"id"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;50&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;|&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;length&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cd"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;10&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;|&lt;/SPAN&gt;&lt;SPAN&gt; \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"5yr"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNotNull&lt;/SPAN&gt;&lt;SPAN&gt;() &lt;/SPAN&gt;&lt;SPAN&gt;&amp;amp;&lt;/SPAN&gt; &lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"5yr"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;cast&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"int"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNull&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# &amp;nbsp; &amp;nbsp;columns=df3.columns&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"df3 completed"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df4 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df3.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"error_reason"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;when&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"id"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNull&lt;/SPAN&gt;&lt;SPAN&gt;(), &lt;/SPAN&gt;&lt;SPAN&gt;"id is Null"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;when&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;length&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"id"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;50&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"id exceeds column size in base table"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;otherwise&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Miscellaneous Error"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# &amp;nbsp; Filter valid records to be loaded to Base Table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df5 &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df2.&lt;/SPAN&gt;&lt;SPAN&gt;exceptAll&lt;/SPAN&gt;&lt;SPAN&gt;(df3).&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"error_reason"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;""&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; ins_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df5.&lt;/SPAN&gt;&lt;SPAN&gt;count&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; filtr_cnt &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; src_cnt &lt;/SPAN&gt;&lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt; ins_cnt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'src_cnt: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(src_cnt))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'ins_cnt: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(ins_cnt))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'filtr_cnt: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(filtr_cnt))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; select_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df5.&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;*&lt;/SPAN&gt;&lt;SPAN&gt;col_info)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"select_df completed"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; err_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df4.&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;*&lt;/SPAN&gt;&lt;SPAN&gt;col_info)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"err_df completed"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; logger.&lt;/SPAN&gt;&lt;SPAN&gt;info&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Final Schema before writing to UC:"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; select_df.&lt;/SPAN&gt;&lt;SPAN&gt;printSchema&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; err_df.&lt;/SPAN&gt;&lt;SPAN&gt;printSchema&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; transformed_df.write&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"catalog&lt;/SPAN&gt;&lt;SPAN&gt;.schema.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;trgt_tbl&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; err_df.write&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(f"catalog.schema.{err_tbl}"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;if&lt;/SPAN&gt; &lt;SPAN&gt;int&lt;/SPAN&gt;&lt;SPAN&gt;(load_ctl_key) &lt;/SPAN&gt;&lt;SPAN&gt;!=&lt;/SPAN&gt; &lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Pushing metrics to UC"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; payload &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"load_ctl_key"&lt;/SPAN&gt;&lt;SPAN&gt;: load_ctl_key,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"job_id"&lt;/SPAN&gt;&lt;SPAN&gt;: job_id,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"batch_id"&lt;/SPAN&gt;&lt;SPAN&gt;: batch_id,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"table"&lt;/SPAN&gt;&lt;SPAN&gt;: trgt_tbl,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"src_cnt"&lt;/SPAN&gt;&lt;SPAN&gt;: src_cnt,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"filter_cnt"&lt;/SPAN&gt;&lt;SPAN&gt;: filtr_cnt,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"ins_cnt"&lt;/SPAN&gt;&lt;SPAN&gt;: ins_cnt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Payload: &lt;/SPAN&gt;&lt;SPAN&gt;{}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(payload))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; payload_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.read.&lt;/SPAN&gt;&lt;SPAN&gt;json&lt;/SPAN&gt;&lt;SPAN&gt;(sc.&lt;/SPAN&gt;&lt;SPAN&gt;parallelize&lt;/SPAN&gt;&lt;SPAN&gt;([payload]))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;display&lt;/SPAN&gt;&lt;SPAN&gt;(payload_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; payload_df.write.&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"catalog.schema&lt;/SPAN&gt;&lt;SPAN&gt;.audit"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;</description>
      <pubDate>Tue, 02 Jul 2024 16:17:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76533#M35248</guid>
      <dc:creator>ajithgaade</dc:creator>
      <dc:date>2024-07-02T16:17:31Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76535#M35250</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/106731"&gt;@ajithgaade&lt;/a&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;can you try setting this conf&amp;nbsp;&lt;/P&gt;
&lt;DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;spark.conf.&lt;/SPAN&gt;&lt;SPAN&gt;set&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.databricks.delta.schema.autoMerge.enabled"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;</description>
      <pubDate>Tue, 02 Jul 2024 17:18:01 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76535#M35250</guid>
      <dc:creator>Giri-Patcham</dc:creator>
      <dc:date>2024-07-02T17:18:01Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader includeExistingFiles with retry didn't update the schema</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76548#M35259</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;Try this :&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Auto Loader Schema Evolution") \
    .getOrCreate()

# Source and checkpoint paths
source_path = "s3://path"
checkpoint_path = "/path/to/checkpoint"

# Define batch processing function
def batch_operation(df, epoch_id):
    # Perform your batch operations here
    # For example, write to Delta table with schema merge
    df.write \
      .format("delta") \
      .mode("append") \
      .option("mergeSchema", "true") \
      .save("/path/to/delta/table")

# Read stream with schema evolution
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.includeExistingFiles", "false") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .load(source_path)

# Write stream with schema merge
query = df.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(availableNow=True) \
    .foreachBatch(batch_operation) \
    .option("mergeSchema", "true") \
    .start()

query.awaitTermination()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;and try&amp;nbsp;Setting Retry Policies&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;{
  "tasks": [
    {
      "task_key": "example-task",
      "notebook_task": {
        "notebook_path": "/path/to/your/notebook"
      },
      "max_retries": 2,
      "min_retry_interval_millis": 60000,
      "retry_on_timeout": true
    }
  ]
}&lt;/LI-CODE&gt;</description>
      <pubDate>Tue, 02 Jul 2024 20:26:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-includeexistingfiles-with-retry-didn-t-update-the/m-p/76548#M35259</guid>
      <dc:creator>mtajmouati</dc:creator>
      <dc:date>2024-07-02T20:26:16Z</dc:date>
    </item>
  </channel>
</rss>

