cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

WATER MARK ERROR WHILE JOINING WITH MULTIPLE STREAM TABLES

BricksGuy
New Contributor II

I am creating a ETL pipeline where i am reading multiple stream table into temp tables and at the end am trying to join those tables to get the output feed into another live table. So for that am using below method where i am giving list of tables as parameter to a method. Inside method using for loop am streaming them one by one into temp tables. After that am trying to execute the sql to get the DF for my new live table.

My Python Method : -

def generate_dlt_from_sources(sources,target_name, sql_query,schema,main_table):
    @Dlt.table(
    name=target_name
    schema=schema,
    temporary=temporary
    )
    def create_table():
        for source in sources:
            spark.readStream.table(source).createOrReplaceTempView(f"temp_{source.split('.')[-1]}")
        spark.read.table(main_table).createOrReplaceTempView(f"temp_{main_table}")
        return (spark.sql(sql_query))

Below are my inputs to that method 

sources = ["table1","table2","table3","table4",------,"tablen"]
main_table = "parent_table"

target_name = "output_table"
sql_query = "WITH CTE_IDVALS AS (
    SELECT id,val FROM temp_table1
    UNION
    SELECT id,val FROM temp_table2
    UNION
    SELECT id,val FROM temp_table3
    UNION
    SELECT id,val FROM temp_table4
)
SELECT p.id,p.val
FROM temp_parent_table p LEFT JOIN CTE_IDVALS ON CTE_IDVALS.id = p.id
WHERE CTE_IDVALS.id IS NULL"
schema = "id long , val string"

I am getting below error while running it in pipeline
ERROR:-
Failed to start stream XXXXX in either append mode or complete mode. Append mode error: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;

Please help!

 

 

7 REPLIES 7

-werners-
Esteemed Contributor III

A stream-stream left join needs a watermark.

f.e.:

stream_df = stream_df.withWatermark("timestamp_column", "30 minutes")
joined_df = stream_df.join(other_stream_df, "join_key")

BricksGuy
New Contributor II

 Thank you for the reply. But in my case am using sql query to read data from those temp tables. So how can i  handle the water mark issue in above scenario.

-werners-
Esteemed Contributor III

try spark.read.table(main_table).withWatermark("timestamp_column", "30 minutes").createOrReplaceTempView
In SQL it is also possible using the WATERMARK function.
https://learn.microsoft.com/en-us/azure/databricks/delta-live-tables/stateful-processing

BricksGuy
New Contributor II

Do i need to water mark it while writing to temp table and in sql statement too

-werners-
Esteemed Contributor III

it is necessary for the join so if the dataframe has a watermark that's enough.
No need to define it multiple times.

BricksGuy
New Contributor II

I did put watermark on the data frame but still getting the same error while executing sql 

 

-werners-
Esteemed Contributor III

Sorry, I put the watermark on the non-streaming table.  That is wrong of course, the watermark has to be set on the streaming table (source in this case).

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group