10-03-2024 10:07 AM
We are using Databricks in combination with Azure platforms, specifically working with Azure Blob Storage (Gen2). We frequently mount Azure containers in the Databricks file system and leverage external locations and volumes for Azure containers.
Our use case involves building several data pipelines in Databricks, and we are currently facing an issue with setting up a file arrival trigger. The goal is to trigger a workflow whenever a new file is dropped into an Azure Blob Storage container (Gen2), and we need to pass the complete file path to the subsequent processor in the workflow.
We would appreciate guidance on how to:
Any advice or best practices to solve this issue would be greatly appreciated!
Thank you for your time and assistance.
Best regards,
Baburam Shrestha
10-04-2024 03:17 AM
Hi @BaburamShrestha ,
.1. To set up a file arrival trigger you can follow below guides:
- File Arrival Triggers in Databricks Workflows (linkedin.com)
- Trigger jobs when new files arrive | Databricks on AWS.
2. To capture the file path that triggered the event I think you can try to pass at task level parameter following value:
.
10-07-2024 01:52 AM
Thanks for you comment @szymon_dybczak
Yes, these articles are amazing.
Another concern is that I need file path with file name by which the file arrival trigger is triggered.
If we have thousands of files inside the folder where the trigger was setup and if multiple files are arrived at that location, Listing the file is not good options to access the latest file so that we can process remaining process in the workflow.
10-05-2024 12:23 PM
@szymon_dybczak May I know what would be the best way to do this for around 2000 tables. I notice out of 15000 tables, 2000 delta tables receive files in bronze layer very rarely. We usually run streaming for 15000 tables through-out the day. Now, I like to tune this set-up by taking away these 2000 infrequently updated tables. The file locations would be different for each of these. Appreciate your thoughts.
10-14-2024 01:34 AM
Hi All,
It seems that the `{{job.trigger.file_arrival.location}}` is returning only a parent folder, not the actual path INCLUDING the file name. Actually the documentation does not specify it, it is quite vague about it.
This is understandable if there is a spark engine doing the processing (so folders are important not files), but in the MLOps use case I would require also a file name (full path). So similar request to @BaburamShrestha.
@szymon_dybczak Do you know if the there is any other way to retrieve the file name from trigger similar as you can do it in Azure Data Factory? Create event-based triggers - Azure Data Factory & Azure Synapse Analytics | Microsoft Learn
BTW, congrats on your Databricks Champion status.
10-14-2024 04:49 AM
Hi @BaburamShrestha,
We inquired about this a few days ago and checked with Databricks. They were working on the issue, but no ETA was provided. You can find more details here: Databricks Community Link.
However, to address this use case, we followed the steps below:
I hope this helps.
10-15-2024 02:04 AM
Hi @Panda,
Many thanks for your answer.
It is a feasible solution, but last time I have checked you need to run your cluster for autoloader to work.
Can you use file_arrival trigger to start the cluster with autoloader running? I think the autoloader will pick up any changes as internally it uses streaming / checkpointing.
Did you get it to work in a described way?
10-15-2024 03:26 AM - edited 10-15-2024 03:52 AM
Yes, we are now following this approach as an alternative solution, which involves combining Autoloader with Databricks File Arrival. In this case, you don't need to run cluster all the time instead use below Autoloader config. Pass the following parameter in the Autoloader configuration, which ensures that the stream only starts when a file arrives.
Parameter to use in Autoloader config:
.trigger(availableNow=True)
availableNow: When set to True, it processes all available data in multiple batches and then terminates the query. Note that only one trigger can be set. For more refer link DatastreamWriter Trigger
I hope this helps.
Sample Code:
# Autoloder Config example what I used
source_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <source_format>)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.useIncrementalListing", "false")
.option("cloudFiles.schemaLocation", f"/tmp/loader_logs/<unique_id_for_job>")
.option("sep", <sep>)
.schema(<source_schema)
.load(<abfs_path)
.withColumn("filename", input_file_name())
)
#Perform Series of transformation - Just an example
ready_df = spark.sql("select * from department") #just an example
ready_df = ready_df.drop(col("employee")) #just an example
.........
ready_df.writeStream.option(
"checkpointLocation", f"/tmp/loader/chkpoints/<destination_table_name>/"
).option("mergeSchema", "true").trigger(availableNow=True).toTable(<destination_table_name>)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group