cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader works on compute cluster, but does not work within a task in workflows

96286
Contributor

I feel like I am going crazy with this. I have tested a data pipeline on my standard compute cluster. I am loading new files as batch from a Google Cloud Storage bucket. Autoloader works exactly as expected from my notebook on my compute cluster. Then, I simply used this notebook as a first task in a workflow using a new job cluster. In order to test this pipeline as a workflow I first removed all checkpoint files and directories before starting the run using this command.

dbutils.fs.rm(checkpoint_path, True)

For some reason, the code works perfectly when testing, but in workflows, I get "streaming stopped" and no data from autoloader. Here is my config for autoloader:

file_path = "gs://raw_zone_twitter"

table_name = f"twitter_data_autoloader"

checkpoint_path = f"/tmp/_checkpoint/twitter_checkpoint"

spark.sql(f"DROP TABLE IF EXISTS {table_name}")

query = (spark.readStream

.format("cloudFiles")

.option("cloudFiles.format", "text")

.option("cloudFiles.schemaLocation", checkpoint_path)

.load(file_path)

.withColumn("filePath", input_file_name())

.writeStream

.option("checkpointLocation", checkpoint_path)

.trigger(once=True)

.toTable(table_name))

When running this as a workflow I see that the checkpoint directory is created, but there is no data inside.

The code between testing on my compute cluster, and the task in my workflow is exactly the same (same notebook), so I really have no idea why autoloader is not working within my workflow...

1 ACCEPTED SOLUTION

Accepted Solutions
4 REPLIES 4

96286
Contributor

@Vidula Khanna​ I see you have responded to previous autoloader questions. Can you help me?

96286
Contributor

Still no progress on this. I want to confirm that my cluster configurations are identical in my notebook running on my general purpose compute cluster and my job cluster. Also I am using the same GCP service account. On my compute cluster autoloader works exactly as expected. Here is the code being used for autoloader (this works on compute cluster).

Screenshot 2023-05-22 at 17.43.40 

However, when I run this exact same code (from the same notebook) as a job autoloader stops the stream (it seems at .writeStream) and i simply see "stream stopped" with no real clue as to why, as seen below.

Screenshot 2023-05-22 at 17.45.53If I go to cloud storage I see that my checkpoint location was created, but the commits folder is empty, meaning autoloader was unable to write the stream.

Screenshot 2023-05-22 at 17.50.55If I run the notebook outside of workflows I see the commits folder gets populated, and if i remove the dbutils.fs.rm(checkpoint_path, True) command autoloader correctly does not write new files until new files are available in the source bucket.

96286
Contributor

Just to be clear, here are the configurations of my job cluster.

Screenshot 2023-05-22 at 18.16.53

96286
Contributor

I found the issue. I describe the solution in the following SO post. https://stackoverflow.com/questions/76287095/databricks-autoloader-works-on-compute-cluster-but-does...

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.