โ01-19-2022 01:36 AM
I have a simple job scheduled every 5 min. Basically it listens to cloudfiles on storage account and writes them into delta table, extremely simple. The code is something like this:
df = (spark
.readStream
.format("cloudFiles")
.option('cloudFiles.format', 'json')
.load(input_path, schema = my_schema)
.select(cols)
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{output_path}/_checkpoint")
.trigger(once = True)
.start(output_path))
Sometimes there are new files, sometimes not. After 40-60 batches it gets stuck on one particular batchId, as if there are no new files in the folder. If i run the script manually i get the same result: it points to the last actually processed batch + 1, but does not write it to the checkpoint folder:
{
"id" : "***,
"runId" : "***",
"name" : null,
"timestamp" : "2022-01-13T15:25:07.512Z",
"batchId" : 64,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
Batch 64 does not exist, last batch written to checkpoint folder is 63!
If I run only the readStream part - it correctly reads the entire list of files ( and starts a new batchId: 0 ). The strangest part is: I have absolutely no Idea what causes it and why it takes around 40-60 batches to get this kind of error.
The files are output of Azure Function that is a subscriber of Azure Service Bus. Basically it goes like this: Service Bus Queue (AVRO) --> Azure Function (AVRO --> JSON) --> Azure Blob Storage --> Delta Table. Each file has a timestamp in its name so no 2 files with equal names and there should be no overwrites or update.
I remade the job in Scala with new tigger .trigger(Trigger.AvailableNow) and got the same result after just 34 batches. What is going on? My checkpoint folder is in the same folder as my delta table (which is Data Lake 2 gen)
From logs it seems like rocksdb does not see the new files for some reason:
WARN FileEventBackfiller: The directory to backfill is empty: dbfs:/mnt/input_path
Every time i got no error, nothing. It just getting stuck at some point. I tried to move checkpoint folder to a different location, mounted on a different storage account. Same result. I have a feeling the the problem lies in rocksdb, but there are very scarse documentation available. Looks like a won a reverse jackpot: nobody is having the same problem! Can you please help me? Suggest any alternatives to AutoLoader?
Thank you so much!
โ01-20-2022 03:33 AM
There are two ways of operating "Directory listing" and "File notification". I recommend file notification as it create even queue about new files. Also it is more effective (and cheaper as we avoid list operation on Azure) as there is no need to scan directory. You can also create own event grid to have full control but for first test is enough to change:
.option("cloudFiles.useNotifications","true")
โ01-19-2022 05:18 AM
Hi Have you tried configuring "cloudFiles.backfillInterval" ?
Default value: None
But you can set it accordingly based on your use case.
Please note this is only available in Databricks Runtime 8.4 and above.
Please file a support case for us to investigate this further.
โ01-20-2022 03:33 AM
There are two ways of operating "Directory listing" and "File notification". I recommend file notification as it create even queue about new files. Also it is more effective (and cheaper as we avoid list operation on Azure) as there is no need to scan directory. You can also create own event grid to have full control but for first test is enough to change:
.option("cloudFiles.useNotifications","true")
โ01-21-2022 06:52 AM
I resolved it by using
.option('cloudFiles.useIncrementalListing', 'false')
Now if I understand correctly, rocksdb reads the whole list of files instead of its mini "checkpoints" based on filename and timestamps. My guess is: my json filenames are composed of flow_name + timestamp, like this:
flow_name_2022-01-18T14-19-50.018Z.json
Maybe some timestamps make it go into non-existent directory because of dots? This is the only explanation that I have. I will test reverting back to incremental listing once i change filenames to something more neutral.
Thank you so much for your suggestions!
โ08-31-2023 01:22 AM - edited โ08-31-2023 01:22 AM
I had the same issue: files would randomly not be loaded.
Setting `.option("cloudFiles.useIncrementalListing", False)` Seemed to do the trick!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group