cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark structured streaming

nileshtiwaari
New Contributor

hi,
could someone please help me with this code :-

input parameter df is a spark structured streaming dataframe
 def apply_duplicacy_check(df, duplicate_check_columns):
    if len(duplicate_check_columns) == 0:
         return None, df

    valid_df = df.dropDuplicates(duplicate_check_columns)

    error_df = df.exceptAll(valid_df)

    return error_df,valid_df

I am getting this error :- 

Except on a streaming DataFrame/Dataset on the right is not supported;
Except All true
:- Project [page#54781.Name AS division_name#54786, page#54781.ShortName AS short_name#54787, page#54781.ExternalSystemCode AS external_system_code#54788, page#54781.AccountingCode AS division_number#54789, page#54781.ParentDivisionId AS parent_division_id#54790, page#54781.TimeZone AS timezone#54791, page#54781.DivisionType.Id AS division_type_id#54792, page#54781.DivisionType.Name AS division_type_name#54793, sourceExtractDatetime#54773 AS source_extract_datetime#54794, page#54781.Id AS division_id#54795]
: +- Project [Data#54772, sourceExtractDatetime#54773, page#54781]
: +- Generate explode(Data#54772.Page), true, [page#54781]

1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @nileshtiwaari , The error message youโ€™re encountering indicates that the .exceptAll() operation is not supported on streaming DataFrames. In structured streaming, certain operations have limitations due to the nature of streaming data.

To address this issue, consider the following points:

  1. Streaming DataFrame Limitations:

    • Streaming DataFrames have restrictions compared to batch DataFrames. For instance, you cannot directly use .exceptAll() on a streaming DataFrame.
    • Instead, you should use streaming-specific operations.
  2. Using .writeStream for Sinks:

    • If you want to redirect duplicates to an error DataFrame (which can be logged into an error table), you need to define a streaming sink.
    • Use .writeStream to specify the output mode (e.g., append, complete, or update) and the checkpoint location.
    • Hereโ€™s an example of how you can write the valid DataFrame to a Delta table:
    valid_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "dbfs:/checkpoint/location/").start()
    
  3. Joining with Static Datasets:

If you have any further questions or need additional assistance, feel free to ask! ๐Ÿ˜Š

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group