cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Autoloader is getting stuck and does not pass to the next batch

Maksym
New Contributor III

I have a simple job scheduled every 5 min. Basically it listens to cloudfiles on storage account and writes them into delta table, extremely simple. The code is something like this:

df = (spark
  .readStream
  .format("cloudFiles")
  .option('cloudFiles.format', 'json')
  .load(input_path, schema = my_schema)
  .select(cols)
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"{output_path}/_checkpoint")
  .trigger(once = True)
  .start(output_path))

Sometimes there are new files, sometimes not. After 40-60 batches it gets stuck on one particular batchId, as if there are no new files in the folder. If i run the script manually i get the same result: it points to the last actually processed batch + 1, but does not write it to the checkpoint folder:

{
  "id" : "***,
  "runId" : "***",
  "name" : null,
  "timestamp" : "2022-01-13T15:25:07.512Z",
  "batchId" : 64,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,

Batch 64 does not exist, last batch written to checkpoint folder is 63!

If I run only the readStream part - it correctly reads the entire list of files ( and starts a new batchId: 0 ). The strangest part is: I have absolutely no Idea what causes it and why it takes around 40-60 batches to get this kind of error.

The files are output of Azure Function that is a subscriber of Azure Service Bus. Basically it goes like this: Service Bus Queue (AVRO) --> Azure Function (AVRO --> JSON) --> Azure Blob Storage --> Delta Table. Each file has a timestamp in its name so no 2 files with equal names and there should be no overwrites or update.

I remade the job in Scala with new tigger .trigger(Trigger.AvailableNow) and got the same result after just 34 batches. What is going on? My checkpoint folder is in the same folder as my delta table (which is Data Lake 2 gen)

From logs it seems like rocksdb does not see the new files for some reason:

WARN FileEventBackfiller: The directory to backfill is empty: dbfs:/mnt/input_path

Every time i got no error, nothing. It just getting stuck at some point. I tried to move checkpoint folder to a different location, mounted on a different storage account. Same result. I have a feeling the the problem lies in rocksdb, but there are very scarse documentation available. Looks like a won a reverse jackpot: nobody is having the same problem! Can you please help me? Suggest any alternatives to AutoLoader?

Thank you so much!

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

There are two ways of operating "Directory listing" and "File notification". I recommend file notification as it create even queue about new files. Also it is more effective (and cheaper as we avoid list operation on Azure) as there is no need to scan directory. You can also create own event grid to have full control but for first test is enough to change:

.option("cloudFiles.useNotifications","true")

View solution in original post

4 REPLIES 4

User15813097110
New Contributor III

Hi Have you tried configuring "cloudFiles.backfillInterval" ?

Default value: None

But you can set it accordingly based on your use case.

Please note this is only available in Databricks Runtime 8.4 and above. 

Please file a support case for us to investigate this further.

Hubert-Dudek
Esteemed Contributor III

There are two ways of operating "Directory listing" and "File notification". I recommend file notification as it create even queue about new files. Also it is more effective (and cheaper as we avoid list operation on Azure) as there is no need to scan directory. You can also create own event grid to have full control but for first test is enough to change:

.option("cloudFiles.useNotifications","true")

Maksym
New Contributor III

I resolved it by using

.option('cloudFiles.useIncrementalListing', 'false')

Now if I understand correctly, rocksdb reads the whole list of files instead of its mini "checkpoints" based on filename and timestamps. My guess is: my json filenames are composed of flow_name + timestamp, like this:

flow_name_2022-01-18T14-19-50.018Z.json

Maybe some timestamps make it go into non-existent directory because of dots? This is the only explanation that I have. I will test reverting back to incremental listing once i change filenames to something more neutral.

Thank you so much for your suggestions!

lassebe
New Contributor II

I had the same issue: files would randomly not be loaded.

Setting `.option("cloudFiles.useIncrementalListing", False)` Seemed to do the trick!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.