cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

AttributeError: 'DataFrame' object has no attribute 'dropDuplicatesWithinWatermark'

johanjohan
New Contributor

Hello,

I have some trouble deduplicating rows on the "id" column, with the method "dropDuplicatesWithinWatermark" in a pipeline. When I run this pipeline, I get the error message:

"AttributeError: 'DataFrame' object has no attribute 'dropDuplicatesWithinWatermark'"

 

Here is part of the code:

 
@dlt.table(
name="streaming_table",
comment="This table is used to test the drop duplicates with watermark"
)

def streaming_table_fct():
stream_df = spark.readStream.table("schema.table") \
.filter(f.col("kind") == "abc") \
.withWatermark("meta_created", "24 hours")
 
stream_df.dropDuplicatesWithinWatermark(["id"])

return stream_df
1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @johanjohan,  The error message you received, “AttributeError: ‘DataFrame’ object has no attribute ‘dropDuplicatesWithinWatermark’”, indicates that the DataFrame object does not have a method called dropDuplicatesWithinWatermark. This is likely because the method name is incorrect or not supported.

Here are a few points to consider:

  1. Method Name: The correct method for deduplicating rows within a watermark in Spark Structured Streaming is dropDuplicates. The dropDuplicatesWithinWatermark method does not exist. It would be best if you used dropDuplicates it instead.

  2. Watermark Column: When using dropDuplicates With a watermark, you need to include the watermark column in the list of columns to check for duplicates. In your case, the watermark column is “meta_created,” so you should have it in the dropDuplicates call.

  3. Stateful Operation: If you have specific requirements for deduplication based on timestamps (such as your “transferTimestamp”), you may need to perform a custom stateful operation. Unfortunately, the built-in dropDuplicates method does not directly support this scenario.

Here’s an alternative approach you can consider:

@dlt.table(
    name="streaming_table",
    comment="This table is used to test the drop duplicates with watermark"
)
def streaming_table_fct():
    stream_df = spark.readStream.table("schema.table") \
        .filter(f.col("kind") == "abc") \
        .withWatermark("meta_created", "24 hours")

    # Use flatMapGroupsWithState or other custom stateful operation
    # to handle deduplication based on your specific requirements.
    # For example:
    # stream_df = stream_df.groupBy("id").agg(f.max("transferTimestamp").alias("max_transferTimestamp"))

    return stream_df

Remember to adjust the custom stateful operation according to your use case. If you need further assistance, feel free to ask! 😊

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!