cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader works on compute cluster, but does not work within a task in workflows

96286
Contributor

I feel like I am going crazy with this. I have tested a data pipeline on my standard compute cluster. I am loading new files as batch from a Google Cloud Storage bucket. Autoloader works exactly as expected from my notebook on my compute cluster. Then, I simply used this notebook as a first task in a workflow using a new job cluster. In order to test this pipeline as a workflow I first removed all checkpoint files and directories before starting the run using this command.

dbutils.fs.rm(checkpoint_path, True)

For some reason, the code works perfectly when testing, but in workflows, I get "streaming stopped" and no data from autoloader. Here is my config for autoloader:

file_path = "gs://raw_zone_twitter"

table_name = f"twitter_data_autoloader"

checkpoint_path = f"/tmp/_checkpoint/twitter_checkpoint"

spark.sql(f"DROP TABLE IF EXISTS {table_name}")

query = (spark.readStream

.format("cloudFiles")

.option("cloudFiles.format", "text")

.option("cloudFiles.schemaLocation", checkpoint_path)

.load(file_path)

.withColumn("filePath", input_file_name())

.writeStream

.option("checkpointLocation", checkpoint_path)

.trigger(once=True)

.toTable(table_name))

When running this as a workflow I see that the checkpoint directory is created, but there is no data inside.

The code between testing on my compute cluster, and the task in my workflow is exactly the same (same notebook), so I really have no idea why autoloader is not working within my workflow...

1 ACCEPTED SOLUTION

Accepted Solutions

96286
Contributor
4 REPLIES 4

96286
Contributor

@Vidula Khanna​ I see you have responded to previous autoloader questions. Can you help me?

96286
Contributor

Still no progress on this. I want to confirm that my cluster configurations are identical in my notebook running on my general purpose compute cluster and my job cluster. Also I am using the same GCP service account. On my compute cluster autoloader works exactly as expected. Here is the code being used for autoloader (this works on compute cluster).

Screenshot 2023-05-22 at 17.43.40 

However, when I run this exact same code (from the same notebook) as a job autoloader stops the stream (it seems at .writeStream) and i simply see "stream stopped" with no real clue as to why, as seen below.

Screenshot 2023-05-22 at 17.45.53If I go to cloud storage I see that my checkpoint location was created, but the commits folder is empty, meaning autoloader was unable to write the stream.

Screenshot 2023-05-22 at 17.50.55If I run the notebook outside of workflows I see the commits folder gets populated, and if i remove the dbutils.fs.rm(checkpoint_path, True) command autoloader correctly does not write new files until new files are available in the source bucket.

96286
Contributor

Just to be clear, here are the configurations of my job cluster.

Screenshot 2023-05-22 at 18.16.53

96286
Contributor

I found the issue. I describe the solution in the following SO post. https://stackoverflow.com/questions/76287095/databricks-autoloader-works-on-compute-cluster-but-does...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group