cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Regarding : How to use Row_number() in dlt pipelines

anusha98
New Contributor

We have two streaming tables : customer_info and customer_info_history and we  joined them using full join to create temp table in pyspark and now we want to eliminate the de-duped records from this temp table. Tried using row_number() but facing below error ..if anyone have solution to fix this error, kindly let me know.

error:Failed to start stream table in either append mode or complete mode. Append mode error: [NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING] Window function is not supported in ROW_NUMBER() (as column `row_num`) on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the WINDOW function.

code:

@dlt.table(name="table")
def temp_latest_email():
    df = dlt.read_stream("table")

    window_spec = Window.partitionBy("col1", "col2", "col3").orderBy(col("col3").desc())

    df_with_rownum = df.withColumn("row_num", row_number().over(window_spec))

    return df_with_rownum.filter(col("row_num") == 1).drop("row_num")
2 REPLIES 2

BS_THE_ANALYST
Esteemed Contributor II

Hey @anusha98

I'm not well-versed with Structured Streaming but it certainly interests me.

Not sure if you've looked up the limitations on Streaming Tables yet: https://docs.databricks.com/aws/en/ldp/streaming-tables#streaming-table-limitations and if you look into the docs for Window functions for Structured Streaming: https://spark.apache.org/docs/latest/streaming/apis-on-dataframes-and-datasets.html#operations-on-st... it indicates that the window functions typically need some time event

BS_THE_ANALYST_0-1761033298844.png

Hopefully that's not a red herring and is pointing in the right direction for the issue at hand. If nobody responds, I'll try and recreate the issue after work today.

All the best,
BS

K_Anudeep
Databricks Employee
Databricks Employee

Hello @anusha98 ,

You’re hitting a real limitation of Structured Streaming: non-time window functions (like row_number() over (...)) aren’t allowed on streaming DFs.

You need to use agg().max() to get the “latest value per key”

@dlt.table(name="temp_latest_email")
def temp_latest_email():
    df = dlt.read_stream("table")
    return (
        df.groupBy("col1","col2")
          .agg(F.max(F.struct("col3","col1","col2")).alias("r"))
          .select(F.col("r.col1").alias("col1"),
                  F.col("r.col2").alias("col2"),
                  F.col("r.col3").alias("col3"))  
    )

 

Or use dropDuplicates to de-dupe before the actual join.

Please let me know if you have any further questions.

 

Anudeep