cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

How can I deduplicate data from my stream?

zll_0091
New Contributor III

Hi,

I'm new to databricks and I'm trying to use stream for my incremental data. This data has duplicates which can be solved using a window function. Can you check where my code goes wrong?

1-------

#Using Auto Loader to read new files
schema = df1.schema

rStream = (
spark.readStream.format("parquet")
.option("includeExistingFiles", False) \
.schema(schema).load(srcpath)
)
 
2------
#deltatable
from delta.tables import *
deltadf = DeltaTable.forName(spark,f"hive.final_table")
 
3------
#Merge and Deduplication
def mergetoDF(microdf, batchId😞

  print(f"inside forEachBatch for batchid:{batchId}. Rows in passed dataframe:{microdf.count()}")

  microdf = Window.partitionBy("key1","key2").orderBy(col('process_key').desc())
  microdf = rStream.withColumn("rownum",row_number().over(microdf)).filter("rownum = 1").drop("rownum")

  (deltadf
   .merge(microdf, "source.key1= target.key1and source.key2= target.key2")
   .whenMatchedUpdateAll("source.process_key<> target.process_key")
   .whenNotMatchedInsertAll()
   .execute()
   )
 
 
4------
wStream = (rStream.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", 'path/checkpoints/') \
.option("mergeSchema","true") \
.outputMode("append") \
.foreachBatch(mergetoDF) \
.start())
 
 
zll_0091_1-1721743629228.png
 
 
I'm really confused since it's my first time using pyspark.
Looking forward to your help.
3 REPLIES 3

szymon_dybczak
Esteemed Contributor III

Hi @zll_0091 ,

Change the output mode to update. Other than that, your code looks fine, but I would rename variable microdf to windowSpec, because now it's little confusing.

Hi @szymon_dybczak 

Thank you for your reply. I have updated the output mode and now encountering below error:

"py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/pyspark/sql/utils.py", line 119, in call
raise e
File "/databricks/spark/python/pyspark/sql/utils.py", line 116, in call
self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
File "<command-1456054439786523>", line 9, in mergetoDF
(deltadf
File "/databricks/spark/python/delta/tables.py", line 1159, in execute
self._jbuilder.execute()
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco
raise converted from None
pyspark.errors.exceptions.AnalysisException: cannot resolve source.key1 in search condition given columns spark_catalog.hive.final_table.key1, spark_catalog.hive.final_table.last_sync_version, spark_catalog.hive.final_table.last_sync_date, spark_catalog.hive.final_table.key2, spark_catalog.hive.final_table.process_key, key1, last_sync_version, last_sync_date, key2, process_key; line 1 pos 0"

szymon_dybczak
Esteemed Contributor III

Hi, 

 

In merge your are referring to source data frame as source, but you need to first alias data frame

 

(deltadf .alias("target") .merge( microdf.alias("source"), "source.key1 = target.key1 AND source.key2 = target.key2

" )

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now