Hello @anusha98 ,
You’re hitting a real limitation of Structured Streaming: non-time window functions (like row_number() over (...)) aren’t allowed on streaming DFs.
You need to use agg().max() to get the “latest value per key”
@dlt.table(name="temp_latest_email")
def temp_latest_email():
df = dlt.read_stream("table")
return (
df.groupBy("col1","col2")
.agg(F.max(F.struct("col3","col1","col2")).alias("r"))
.select(F.col("r.col1").alias("col1"),
F.col("r.col2").alias("col2"),
F.col("r.col3").alias("col3"))
)
Or use dropDuplicates to de-dupe before the actual join.
Please let me know if you have any further questions.
Anudeep