cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

How can I deduplicate data from my stream?

zll_0091
New Contributor III

Hi,

I'm new to databricks and I'm trying to use stream for my incremental data. This data has duplicates which can be solved using a window function. Can you check where my code goes wrong?

1-------

#Using Auto Loader to read new files
schema = df1.schema

rStream = (
spark.readStream.format("parquet")
.option("includeExistingFiles", False) \
.schema(schema).load(srcpath)
)
 
2------
#deltatable
from delta.tables import *
deltadf = DeltaTable.forName(spark,f"hive.final_table")
 
3------
#Merge and Deduplication
def mergetoDF(microdf, batchId😞

  print(f"inside forEachBatch for batchid:{batchId}. Rows in passed dataframe:{microdf.count()}")

  microdf = Window.partitionBy("key1","key2").orderBy(col('process_key').desc())
  microdf = rStream.withColumn("rownum",row_number().over(microdf)).filter("rownum = 1").drop("rownum")

  (deltadf
   .merge(microdf, "source.key1= target.key1and source.key2= target.key2")
   .whenMatchedUpdateAll("source.process_key<> target.process_key")
   .whenNotMatchedInsertAll()
   .execute()
   )
 
 
4------
wStream = (rStream.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", 'path/checkpoints/') \
.option("mergeSchema","true") \
.outputMode("append") \
.foreachBatch(mergetoDF) \
.start())
 
 
zll_0091_1-1721743629228.png
 
 
I'm really confused since it's my first time using pyspark.
Looking forward to your help.
3 REPLIES 3

szymon_dybczak
Contributor III

Hi @zll_0091 ,

Change the output mode to update. Other than that, your code looks fine, but I would rename variable microdf to windowSpec, because now it's little confusing.

Hi @szymon_dybczak 

Thank you for your reply. I have updated the output mode and now encountering below error:

"py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/pyspark/sql/utils.py", line 119, in call
raise e
File "/databricks/spark/python/pyspark/sql/utils.py", line 116, in call
self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
File "<command-1456054439786523>", line 9, in mergetoDF
(deltadf
File "/databricks/spark/python/delta/tables.py", line 1159, in execute
self._jbuilder.execute()
File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco
raise converted from None
pyspark.errors.exceptions.AnalysisException: cannot resolve source.key1 in search condition given columns spark_catalog.hive.final_table.key1, spark_catalog.hive.final_table.last_sync_version, spark_catalog.hive.final_table.last_sync_date, spark_catalog.hive.final_table.key2, spark_catalog.hive.final_table.process_key, key1, last_sync_version, last_sync_date, key2, process_key; line 1 pos 0"

Hi, 

 

In merge your are referring to source data frame as source, but you need to first alias data frame

 

(deltadf .alias("target") .merge( microdf.alias("source"), "source.key1 = target.key1 AND source.key2 = target.key2

" )

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group