<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic How can I deduplicate data from my stream? in Get Started Discussions</title>
    <link>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80157#M7893</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I'm new to databricks and I'm trying to use stream for my incremental data. This data has duplicates which can be solved using a window function. Can you check where my code goes wrong?&lt;/P&gt;&lt;P&gt;1-------&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#Using Auto Loader to read new files &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;schema &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df1.schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;rStream &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.readStream.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"parquet"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"includeExistingFiles"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;schema&lt;/SPAN&gt;&lt;SPAN&gt;(schema).&lt;/SPAN&gt;&lt;SPAN&gt;load&lt;/SPAN&gt;&lt;SPAN&gt;(srcpath)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;2------&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#deltatable&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; delta.tables &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;*&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;deltadf &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; DeltaTable.&lt;/SPAN&gt;&lt;SPAN&gt;forName&lt;/SPAN&gt;&lt;SPAN&gt;(spark,&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"hive.final_table"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;3------&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#Merge and Deduplication&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;mergetoDF&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;microdf&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batchId&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"inside forEachBatch for batchid:&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batchId&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;. Rows in passed dataframe:&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;microdf.&lt;/SPAN&gt;&lt;SPAN&gt;count&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; microdf &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; Window.&lt;/SPAN&gt;&lt;SPAN&gt;partitionBy&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"key1"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;"key2"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;orderBy&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'process_key'&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;desc&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; microdf &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; rStream.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"rownum"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;row_number&lt;/SPAN&gt;&lt;SPAN&gt;().&lt;/SPAN&gt;&lt;SPAN&gt;over&lt;/SPAN&gt;&lt;SPAN&gt;(microdf))&lt;/SPAN&gt;&lt;SPAN&gt;.filter&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;rownum &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;drop&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"rownum"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; (deltadf&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;merge&lt;/SPAN&gt;&lt;SPAN&gt;(microdf, &lt;/SPAN&gt;&lt;SPAN&gt;"source.key1= target.key1and source.key2= target.key2"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;whenMatchedUpdateAll&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"source.process_key&amp;lt;&amp;gt; target.process_key"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;whenNotMatchedInsertAll&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;execute&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;4------&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;wStream&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (rStream.writeStream \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;trigger&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;availableNow&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;'path/checkpoints/'&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;outputMode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;foreachBatch&lt;/SPAN&gt;&lt;SPAN&gt;(mergetoDF) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;start&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="zll_0091_1-1721743629228.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/9816i325F0559846FF5A2/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="zll_0091_1-1721743629228.png" alt="zll_0091_1-1721743629228.png" /&gt;&lt;/span&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;I'm really confused since it's my first time using pyspark.&lt;/DIV&gt;&lt;DIV&gt;Looking forward to your help.&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Tue, 23 Jul 2024 14:07:37 GMT</pubDate>
    <dc:creator>zll_0091</dc:creator>
    <dc:date>2024-07-23T14:07:37Z</dc:date>
    <item>
      <title>How can I deduplicate data from my stream?</title>
      <link>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80157#M7893</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I'm new to databricks and I'm trying to use stream for my incremental data. This data has duplicates which can be solved using a window function. Can you check where my code goes wrong?&lt;/P&gt;&lt;P&gt;1-------&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#Using Auto Loader to read new files &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;schema &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; df1.schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;rStream &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.readStream.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"parquet"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"includeExistingFiles"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;schema&lt;/SPAN&gt;&lt;SPAN&gt;(schema).&lt;/SPAN&gt;&lt;SPAN&gt;load&lt;/SPAN&gt;&lt;SPAN&gt;(srcpath)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;2------&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#deltatable&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; delta.tables &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;*&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;deltadf &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; DeltaTable.&lt;/SPAN&gt;&lt;SPAN&gt;forName&lt;/SPAN&gt;&lt;SPAN&gt;(spark,&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"hive.final_table"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;3------&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#Merge and Deduplication&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;mergetoDF&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;microdf&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batchId&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"inside forEachBatch for batchid:&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batchId&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;. Rows in passed dataframe:&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;microdf.&lt;/SPAN&gt;&lt;SPAN&gt;count&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; microdf &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; Window.&lt;/SPAN&gt;&lt;SPAN&gt;partitionBy&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"key1"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;"key2"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;orderBy&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'process_key'&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;desc&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; microdf &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; rStream.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"rownum"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;row_number&lt;/SPAN&gt;&lt;SPAN&gt;().&lt;/SPAN&gt;&lt;SPAN&gt;over&lt;/SPAN&gt;&lt;SPAN&gt;(microdf))&lt;/SPAN&gt;&lt;SPAN&gt;.filter&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;rownum &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;drop&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"rownum"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; (deltadf&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;merge&lt;/SPAN&gt;&lt;SPAN&gt;(microdf, &lt;/SPAN&gt;&lt;SPAN&gt;"source.key1= target.key1and source.key2= target.key2"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;whenMatchedUpdateAll&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"source.process_key&amp;lt;&amp;gt; target.process_key"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;whenNotMatchedInsertAll&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;.&lt;/SPAN&gt;&lt;SPAN&gt;execute&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;4------&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;wStream&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (rStream.writeStream \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;trigger&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;availableNow&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;'path/checkpoints/'&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;outputMode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;foreachBatch&lt;/SPAN&gt;&lt;SPAN&gt;(mergetoDF) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;start&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="zll_0091_1-1721743629228.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/9816i325F0559846FF5A2/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="zll_0091_1-1721743629228.png" alt="zll_0091_1-1721743629228.png" /&gt;&lt;/span&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;I'm really confused since it's my first time using pyspark.&lt;/DIV&gt;&lt;DIV&gt;Looking forward to your help.&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Tue, 23 Jul 2024 14:07:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80157#M7893</guid>
      <dc:creator>zll_0091</dc:creator>
      <dc:date>2024-07-23T14:07:37Z</dc:date>
    </item>
    <item>
      <title>Re: How can I deduplicate data from my stream?</title>
      <link>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80160#M7894</link>
      <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/113366"&gt;@zll_0091&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Change the output mode to update. Other than that, your code looks fine, but I would rename variable microdf to windowSpec, because now it's little confusing.&lt;/P&gt;</description>
      <pubDate>Tue, 23 Jul 2024 14:33:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80160#M7894</guid>
      <dc:creator>szymon_dybczak</dc:creator>
      <dc:date>2024-07-23T14:33:51Z</dc:date>
    </item>
    <item>
      <title>Re: How can I deduplicate data from my stream?</title>
      <link>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80234#M7895</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/110502"&gt;@szymon_dybczak&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thank you for your reply. I have updated the output mode and now encountering below error:&lt;/P&gt;&lt;P&gt;"py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):&lt;BR /&gt;File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy&lt;BR /&gt;return_value = getattr(self.pool[obj_id], method)(*params)&lt;BR /&gt;File "/databricks/spark/python/pyspark/sql/utils.py", line 119, in call&lt;BR /&gt;raise e&lt;BR /&gt;File "/databricks/spark/python/pyspark/sql/utils.py", line 116, in call&lt;BR /&gt;self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)&lt;BR /&gt;File "&amp;lt;command-1456054439786523&amp;gt;", line 9, in mergetoDF&lt;BR /&gt;(deltadf&lt;BR /&gt;File "/databricks/spark/python/delta/tables.py", line 1159, in execute&lt;BR /&gt;self._jbuilder.execute()&lt;BR /&gt;File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__&lt;BR /&gt;return_value = get_return_value(&lt;BR /&gt;File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco&lt;BR /&gt;raise converted from None&lt;BR /&gt;pyspark.errors.exceptions.AnalysisException: cannot resolve source.key1 in search condition given columns spark_catalog.hive.final_table.key1, spark_catalog.hive.final_table.last_sync_version, spark_catalog.hive.final_table.last_sync_date, spark_catalog.hive.final_table.key2, spark_catalog.hive.final_table.process_key, key1, last_sync_version, last_sync_date, key2, process_key; line 1 pos 0"&lt;/P&gt;</description>
      <pubDate>Wed, 24 Jul 2024 02:21:24 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80234#M7895</guid>
      <dc:creator>zll_0091</dc:creator>
      <dc:date>2024-07-24T02:21:24Z</dc:date>
    </item>
    <item>
      <title>Re: How can I deduplicate data from my stream?</title>
      <link>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80251#M7896</link>
      <description>&lt;P&gt;Hi,&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;In merge your are referring to source data frame as source, but you need to first alias data frame&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;(deltadf .alias("target") .merge( microdf.alias("source"), "source.key1 = target.key1 AND source.key2 = target.key2&lt;/P&gt;&lt;P&gt;" )&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 24 Jul 2024 05:43:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/how-can-i-deduplicate-data-from-my-stream/m-p/80251#M7896</guid>
      <dc:creator>szymon_dybczak</dc:creator>
      <dc:date>2024-07-24T05:43:07Z</dc:date>
    </item>
  </channel>
</rss>

