cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

how to stop dataframe with federated table source to be reevaluated when referenced (cache?)

Anske
New Contributor III

Hi,

Would anyone happen to know whether it's possible to cache a dataframe in memory that the result of a query on a federated table?

I have a notebook that queries a federated table, does some transformations on the dataframe and then writes this dataframe to a delta table. However, every time in the notebook when referencing the dataframe, the query gets 're- executed' on the federated source (sql server database), instead of operating on the dataframe in memory. 

I would like to prevent this, such that the source is only queried once, and from there on, all dataframe functions operate on the dataframe in memory instead of on an updated dataset (without having to write to disk/delta table and then rereading it from disk/delta table again straight after). I'm processing batches of lsn_time_table, but the batch keeps changing every time I refer to the dataframe :-(.

4 REPLIES 4

Lakshay
Esteemed Contributor
Esteemed Contributor

Caching or persisting is the fastest option here. But there is a limitation with this. In case your dataset is big and cannot fit into memory, then this won't help and the spark will still refer the source data.

If you need to avoid referring the source data completely, either checkpointing or writing data to file/table is a better option.

Anske
New Contributor III

Thanks for your answer Lakshay. I have tried caching the df by using the cache() function, but it does not seem to do anything (the dataset in this case is tiny, so I'm pretty sure it would fit into memory). So I'm indeed back to writing to file first and going from there.

 

daniel_sahal
Esteemed Contributor

@Anske 
Could you paste a code snippet here?

Anske
New Contributor III

@daniel_sahal , this is the code snippet:

lsn_incr_batch = spark.sql(f"""
select start_lsn,tran_begin_time,tran_end_time,tran_id,tran_begin_lsn,cast('{current_run_ts}' as timestamp) as appended
from externaldb.cdc.lsn_time_mapping
where tran_end_time > '{batch_end_ts}'
""")

#lsn_incr_batch.cache()

lsn_incr_batch.write.mode("append").saveAsTable("poc_catalog.cdc_source.lsn_time_mapping")
table = "cdctest"
ext_table = "externaldb.cdc.dbo_" + table + "_CT"
 
last_lsn = spark.sql(f"""
SELECT max_lsn
FROM poc_catalog.cdc_source.manifest
WHERE source_table_name = '{table}'
ORDER BY appended desc
LIMIT 1
""")
 
cdc_incr_batch = spark.sql(f"""
select ID,test1,test2,test3,`__$operation` as operation ,`__$start_lsn` as start_lsn,current_timestamp as appended 
from {ext_table} 
where `__$start_lsn` > 
(
select m.max_lsn
from poc_catalog.cdc_source.manifest m
where m.source_table_name = '{table}'
order by m.appended desc
limit 1
)
""") 
 
# filter out the records for which the change is not included in the lsn batch (yet)
lsn_incr_steady_batch = spark.sql(f"""
select * from poc_catalog.cdc_source.lsn_time_mapping 
where appended = '{current_run_ts}'  
""")
 
cdc_incr_batch_filtered = cdc_incr_batch.join(lsn_incr_steady_batch,["start_lsn"],"leftsemi") 
 
As you can see in the snippet, I am now first writing the lsn_incr_batch df to a delta table, so it remains an unchanged batch, but I would prefer to use the in memory dataframe in the semi-join to filter out the cdc records from an individual table instead of the table on disk...