In our Delta Live Table pipeline I am simply joining two streaming tables to a new streaming table.
We use the following code:
@Dlt.create_table()
def fact_event_faults():
events = dlt.read_stream('event_list').withWatermark('TimeStamp', '4 hours')
files = dlt.read_stream('file_list').withWatermark('Trigger_DateTime', '4 hours')
event_faults = events.join(files,
(
(events.TimeStamp >= (files.Trigger_DateTime - f.expr('INTERVAL 1 SECONDS'))) &
(events.TimeStamp <= (files.Trigger_DateTime + f.expr('INTERVAL 1 SECONDS'))) &
(events.Unique_Installation_ID == files.Unique_Installation_ID)
),
how='leftouter')
return event_faults
When running this, the event_list table has 12K records, the file_list has 4000 records but the event_faults table only has 76 records.
In an left outer join I would expect at least 12K records in the resulting table.
Executing the same query in a notebook without streaming and without watermarking, returns the expected 12K+ rows.
What am I missing here?