Let's understand the complexity behind this code when executed on delta table along with Spark.
pks = spark.read.format("jdbc").option("query": "SELECT pk FROM sql_table_name").load()
delta_table = spark.read.table(delta_table_name)
r = target_table.filter(~col("pk").isin(pks[0]))
display(r)
Line 1: You are reading data from OLTP system for all primary keys from given table into dataframe.
Line 2: Reading Delta table
Line 3: This line is actually causing the complexity, where you are preparing a python list from pks dataframe and passing to filter on delta table dataframe. Here the code will try to loop through the rows in pks df to capture all PK values list, the looping will increase the time by consuming both compute and memory.
Solution:
You can maintain a delete logs table in SQL instance (OLTP). You use this only to delete records in your delta table. As I am assuming you are not soft deleting your data in SQL table, your row will be gone for good, so you can create a trigger in SQL server to manage a log of deleted record or rows which can be leveraged.
Regards,
Hari Prasad