cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Incremental join transformation using Delta live tables

Edthehead
New Contributor III

I'm attempting to build an incremental data processing pipeline using delta live tables. The aim to stream data from a source multiple times in a day and join the data within the specific increment only.

I'm using autoloader to load the data incrementally from source.

I've taken a sample of 2 tables which I'm attempting to join with each other and produce a gold table. When I run the pipeline for the first time, the tables are joined correctly and the count of records in the gold table is correct.

In the next step, I attempt to load 2 records each in the input tables and reran the entire pipeline. I was expecting the final table row count to be incremented by 2. However I found that the number was > 2. On closer inspection, I realized that even though only 2 records were streamed in for each of the input tables, the join occurred with the entire table.
Is there a way I can have only the delta of each file be joined with each other rather than have the join occur over the entire table?

The joining step looks like this.

def load_table_to_gold(sqlstmt,temp,outtable):  
    @dlt.table(name=f"GOLD_{outtable}",temporary=temp, comment=f"This is my gold table GOLD_{outtable}")
    def gold_cold():
        golddf = spark.sql("SELECT c.customer_id, c.name, c.mobile_number, o.order_id, o.total_amount, c.ingested_date FROM STREAM LIVE.SILVER_Cust AS c INNER JOIN STREAM LIVE.SILVER_Orders AS o ON c.customer_id = o.customer_id")
        return golddf

As you can see from the pic, it shows that the 2 incoming tables streamed in 2 records each, however, the join that occurred spat out 19 records into the Gold table when I was expecting 2. 

pic.png

5 REPLIES 5

-werners-
Esteemed Contributor III

I do not use DLT, but it seems that what you want to achieve is not actually a direct merge on a target table
I think you want to combine the new incoming data only and append/merge that to the target table, correct?
If so, I would treat it like that: a join on the new data which is overwritten completely at each run, and this is appended to the target table.
Take

Edthehead
New Contributor III

Well yes, that is exactly what I would like to do. I want to do a join only within the new data and then append into the new tables. So if there are 2 new records in each input table, the append should also 2 new records. However, if I use streaming or DLT, the join is being done with the entire table and not just within the incremental new data. Can I achieve what I need with DLT? If not, what can I do using standard pyspark? Another option is using change data feed but I'm yet to explore that.

-werners-
Esteemed Contributor III

I suppose it is possible in dlt too, but in plain spark I would join the input tables into a dataframe, and that dataframe is then appended to the target table.

Edthehead
New Contributor III

When I try joining the 2 tables using streaming, the entire tables are joined with each other rather than just the incremental changes with each other. I'm really joining 2 tables which are being incremented in frequent intervals.  

 

-werners-
Esteemed Contributor III

basically you want to do a stream-stream join.  If you want to do that you need to take a few things into account (see link).
DLT might do this for you, but I never used it so I cannot confirm that.
If your source tables are delta tables, you could indeed use data feed.
Another option is to not use streaming and selecting the data from the source tables using a simple where condition.
Of course this will work but there is no management of state (what if a run has failed, what was the last update I did etc, unless you store some kind of watermark somewhere (without usign streams).

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.