cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Databricks Autoloader is getting stuck and does not pass to the next batch

Maksym
New Contributor III

I have a simple job scheduled every 5 min. Basically it listens to cloudfiles on storage account and writes them into delta table, extremely simple. The code is something like this:

df = (spark
  .readStream
  .format("cloudFiles")
  .option('cloudFiles.format', 'json')
  .load(input_path, schema = my_schema)
  .select(cols)
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"{output_path}/_checkpoint")
  .trigger(once = True)
  .start(output_path))

Sometimes there are new files, sometimes not. After 40-60 batches it gets stuck on one particular batchId, as if there are no new files in the folder. If i run the script manually i get the same result: it points to the last actually processed batch + 1, but does not write it to the checkpoint folder:

{
  "id" : "***,
  "runId" : "***",
  "name" : null,
  "timestamp" : "2022-01-13T15:25:07.512Z",
  "batchId" : 64,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,

Batch 64 does not exist, last batch written to checkpoint folder is 63!

If I run only the readStream part - it correctly reads the entire list of files ( and starts a new batchId: 0 ). The strangest part is: I have absolutely no Idea what causes it and why it takes around 40-60 batches to get this kind of error.

The files are output of Azure Function that is a subscriber of Azure Service Bus. Basically it goes like this: Service Bus Queue (AVRO) --> Azure Function (AVRO --> JSON) --> Azure Blob Storage --> Delta Table. Each file has a timestamp in its name so no 2 files with equal names and there should be no overwrites or update.

I remade the job in Scala with new tigger .trigger(Trigger.AvailableNow) and got the same result after just 34 batches. What is going on? My checkpoint folder is in the same folder as my delta table (which is Data Lake 2 gen)

From logs it seems like rocksdb does not see the new files for some reason:

WARN FileEventBackfiller: The directory to backfill is empty: dbfs:/mnt/input_path

Every time i got no error, nothing. It just getting stuck at some point. I tried to move checkpoint folder to a different location, mounted on a different storage account. Same result. I have a feeling the the problem lies in rocksdb, but there are very scarse documentation available. Looks like a won a reverse jackpot: nobody is having the same problem! Can you please help me? Suggest any alternatives to AutoLoader?

Thank you so much!

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

There are two ways of operating "Directory listing" and "File notification". I recommend file notification as it create even queue about new files. Also it is more effective (and cheaper as we avoid list operation on Azure) as there is no need to scan directory. You can also create own event grid to have full control but for first test is enough to change:

.option("cloudFiles.useNotifications","true")

View solution in original post

5 REPLIES 5

User15813097110
New Contributor III

Hi Have you tried configuring "cloudFiles.backfillInterval" ?

Default value: None

But you can set it accordingly based on your use case.

Please note this is only available in Databricks Runtime 8.4 and above. 

Please file a support case for us to investigate this further.

configuring "cloudFiles.backfillInterval" does a full scan for every backfill run. Is there a way where we can filter the scan to recent files after the first backfill run. Tried maxFileAge and modifiedAfter options which don't seem to help 

Hubert-Dudek
Esteemed Contributor III

There are two ways of operating "Directory listing" and "File notification". I recommend file notification as it create even queue about new files. Also it is more effective (and cheaper as we avoid list operation on Azure) as there is no need to scan directory. You can also create own event grid to have full control but for first test is enough to change:

.option("cloudFiles.useNotifications","true")

Maksym
New Contributor III

I resolved it by using

.option('cloudFiles.useIncrementalListing', 'false')

Now if I understand correctly, rocksdb reads the whole list of files instead of its mini "checkpoints" based on filename and timestamps. My guess is: my json filenames are composed of flow_name + timestamp, like this:

flow_name_2022-01-18T14-19-50.018Z.json

Maybe some timestamps make it go into non-existent directory because of dots? This is the only explanation that I have. I will test reverting back to incremental listing once i change filenames to something more neutral.

Thank you so much for your suggestions!

lassebe
New Contributor II

I had the same issue: files would randomly not be loaded.

Setting `.option("cloudFiles.useIncrementalListing", False)` Seemed to do the trick!