@daniel_sahal , this is the code snippet:
lsn_incr_batch = spark.sql(f"""
select start_lsn,tran_begin_time,tran_end_time,tran_id,tran_begin_lsn,cast('{current_run_ts}' as timestamp) as appended
from externaldb.cdc.lsn_time_mapping
where tran_end_time > '{batch_end_ts}'
""")
#lsn_incr_batch.cache()
lsn_incr_batch.write.mode("append").saveAsTable("poc_catalog.cdc_source.lsn_time_mapping")
table = "cdctest"
ext_table = "externaldb.cdc.dbo_" + table + "_CT"
last_lsn = spark.sql(f"""
SELECT max_lsn
FROM poc_catalog.cdc_source.manifest
WHERE source_table_name = '{table}'
ORDER BY appended desc
LIMIT 1
""")
cdc_incr_batch = spark.sql(f"""
select ID,test1,test2,test3,`__$operation` as operation ,`__$start_lsn` as start_lsn,current_timestamp as appended
from {ext_table}
where `__$start_lsn` >
(
select m.max_lsn
from poc_catalog.cdc_source.manifest m
where m.source_table_name = '{table}'
order by m.appended desc
limit 1
)
""")
# filter out the records for which the change is not included in the lsn batch (yet)
lsn_incr_steady_batch = spark.sql(f"""
select * from poc_catalog.cdc_source.lsn_time_mapping
where appended = '{current_run_ts}'
""")
cdc_incr_batch_filtered = cdc_incr_batch.join(lsn_incr_steady_batch,["start_lsn"],"leftsemi")
As you can see in the snippet, I am now first writing the lsn_incr_batch df to a delta table, so it remains an unchanged batch, but I would prefer to use the in memory dataframe in the semi-join to filter out the cdc records from an individual table instead of the table on disk...