- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-08-2025 04:26 AM
Hi all,
I'm using autoloader to ingest files, each files contains changed data from a table and I merge it into a delta table. It works fine.
But if i want re-ingest all the files (deleting the checkpoint location, at example) i need to re-ingest the file in order and follow the sequence, other wise the merge operation create wrong result.
For achieve this goal i have followed this strategy:
- add a column (_metadata.file_path) in the readstream
- the trigger is always AvailableNow= true
- the foreachbatch function contains this logic: extract from the batch_df the distinct file_path and then iterate them in order, to apply the merge opration.
It's seems that extract the distinct file path return always a NoneType and then fail when start the iteration
Any idea how can I solve this problem?
Thanks
def process_batch(microBatchOutputDF, batchId):
if not microBatchOutputDF.isEmpty():
#df_with_file = microBatchOutputDF.filter(F.col("_source_file_path")).isNotNull()
#get distincts files in this batch
files = microBatchOutputDF.select(F.col("_source_file_path")).distinct().collect()
if len(files) > 0:
for file in files.sort():
filename = file["_source_file_path"]
print(f"Processing file: {filename}")
#filter recodrs for this specif file
file_df = microBatchOutputDF.filter(f"(_source_file_path='{filename}')")
process_single_file(file_df, filename, batchId)