cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

AttributeError: 'DataFrame' object has no attribute 'dropDuplicatesWithinWatermark'

johanjohan
New Contributor

Hello,

I have some trouble deduplicating rows on the "id" column, with the method "dropDuplicatesWithinWatermark" in a pipeline. When I run this pipeline, I get the error message:

"AttributeError: 'DataFrame' object has no attribute 'dropDuplicatesWithinWatermark'"

 

Here is part of the code:

 
@dlt.table(
name="streaming_table",
comment="This table is used to test the drop duplicates with watermark"
)

def streaming_table_fct():
stream_df = spark.readStream.table("schema.table") \
.filter(f.col("kind") == "abc") \
.withWatermark("meta_created", "24 hours")
 
stream_df.dropDuplicatesWithinWatermark(["id"])

return stream_df
1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @johanjohan,  The error message you received, “AttributeError: ‘DataFrame’ object has no attribute ‘dropDuplicatesWithinWatermark’”, indicates that the DataFrame object does not have a method called dropDuplicatesWithinWatermark. This is likely because the method name is incorrect or not supported.

Here are a few points to consider:

  1. Method Name: The correct method for deduplicating rows within a watermark in Spark Structured Streaming is dropDuplicates. The dropDuplicatesWithinWatermark method does not exist. It would be best if you used dropDuplicates it instead.

  2. Watermark Column: When using dropDuplicates With a watermark, you need to include the watermark column in the list of columns to check for duplicates. In your case, the watermark column is “meta_created,” so you should have it in the dropDuplicates call.

  3. Stateful Operation: If you have specific requirements for deduplication based on timestamps (such as your “transferTimestamp”), you may need to perform a custom stateful operation. Unfortunately, the built-in dropDuplicates method does not directly support this scenario.

Here’s an alternative approach you can consider:

@dlt.table(
    name="streaming_table",
    comment="This table is used to test the drop duplicates with watermark"
)
def streaming_table_fct():
    stream_df = spark.readStream.table("schema.table") \
        .filter(f.col("kind") == "abc") \
        .withWatermark("meta_created", "24 hours")

    # Use flatMapGroupsWithState or other custom stateful operation
    # to handle deduplication based on your specific requirements.
    # For example:
    # stream_df = stream_df.groupBy("id").agg(f.max("transferTimestamp").alias("max_transferTimestamp"))

    return stream_df

Remember to adjust the custom stateful operation according to your use case. If you need further assistance, feel free to ask! 😊

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.