โ05-19-2023 12:49 AM
I feel like I am going crazy with this. I have tested a data pipeline on my standard compute cluster. I am loading new files as batch from a Google Cloud Storage bucket. Autoloader works exactly as expected from my notebook on my compute cluster. Then, I simply used this notebook as a first task in a workflow using a new job cluster. In order to test this pipeline as a workflow I first removed all checkpoint files and directories before starting the run using this command.
dbutils.fs.rm(checkpoint_path, True)
For some reason, the code works perfectly when testing, but in workflows, I get "streaming stopped" and no data from autoloader. Here is my config for autoloader:
file_path = "gs://raw_zone_twitter"
table_name = f"twitter_data_autoloader"
checkpoint_path = f"/tmp/_checkpoint/twitter_checkpoint"
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.withColumn("filePath", input_file_name())
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.toTable(table_name))
When running this as a workflow I see that the checkpoint directory is created, but there is no data inside.
The code between testing on my compute cluster, and the task in my workflow is exactly the same (same notebook), so I really have no idea why autoloader is not working within my workflow...
โ05-23-2023 07:09 AM
I found the issue. I describe the solution in the following SO post. https://stackoverflow.com/questions/76287095/databricks-autoloader-works-on-compute-cluster-but-does...
โ05-19-2023 03:42 AM
@Vidula Khannaโ I see you have responded to previous autoloader questions. Can you help me?
โ05-22-2023 08:54 AM
Still no progress on this. I want to confirm that my cluster configurations are identical in my notebook running on my general purpose compute cluster and my job cluster. Also I am using the same GCP service account. On my compute cluster autoloader works exactly as expected. Here is the code being used for autoloader (this works on compute cluster).
However, when I run this exact same code (from the same notebook) as a job autoloader stops the stream (it seems at .writeStream) and i simply see "stream stopped" with no real clue as to why, as seen below.
If I go to cloud storage I see that my checkpoint location was created, but the commits folder is empty, meaning autoloader was unable to write the stream.
If I run the notebook outside of workflows I see the commits folder gets populated, and if i remove the dbutils.fs.rm(checkpoint_path, True) command autoloader correctly does not write new files until new files are available in the source bucket.
โ05-22-2023 09:17 AM
โ05-23-2023 07:09 AM
I found the issue. I describe the solution in the following SO post. https://stackoverflow.com/questions/76287095/databricks-autoloader-works-on-compute-cluster-but-does...
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group