cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

File Arrival Trigger

BaburamShrestha
New Contributor II

We are using Databricks in combination with Azure platforms, specifically working with Azure Blob Storage (Gen2). We frequently mount Azure containers in the Databricks file system and leverage external locations and volumes for Azure containers.

Our use case involves building several data pipelines in Databricks, and we are currently facing an issue with setting up a file arrival trigger. The goal is to trigger a workflow whenever a new file is dropped into an Azure Blob Storage container (Gen2), and we need to pass the complete file path to the subsequent processor in the workflow.

We would appreciate guidance on how to:

  1. Set up a file arrival trigger in Databricks for Azure Blob Storage (Gen2).
  2. Capture the file path that triggered the event and pass it as a parameter to the next task in the pipeline.

Any advice or best practices to solve this issue would be greatly appreciated!

Thank you for your time and assistance.

Best regards,
Baburam Shrestha

7 REPLIES 7

szymon_dybczak
Contributor III

Hi @BaburamShrestha ,

.1. To set up a file arrival trigger you can follow below guides:

File Arrival Triggers in Databricks Workflows (linkedin.com)

Trigger jobs when new files arrive | Databricks on AWS.

2. To capture the file path that triggered the event I think you can try to pass at task level parameter following value:

 

szymon_dybczak_0-1728037063209.png

 



.

Thanks for you comment @szymon_dybczak 

Yes, these articles are amazing.

Another concern is that I need file path with file name by which the file arrival trigger is triggered.

If we have thousands of files inside the folder where the trigger was setup and if multiple files are arrived at that location, Listing the file is not good options to access the latest file so that we can process remaining process in the workflow.

noorbasha534
Contributor

@szymon_dybczak May I know what would be the best way to do this for around 2000 tables. I notice out of 15000 tables, 2000 delta tables receive files in bronze layer very rarely. We usually run streaming for 15000 tables through-out the day. Now, I like to tune this set-up by taking away these 2000 infrequently updated tables. The file locations would be different for each of these. Appreciate your thoughts.

KrzysztofPrzyso
New Contributor III

Hi All,

It seems that the `{{job.trigger.file_arrival.location}}` is returning only a parent folder, not the actual path INCLUDING the file name. Actually the documentation does not specify it, it is quite vague about it.

This is understandable if there is a spark engine doing the processing (so folders are important not files), but in the MLOps use case I would require also a file name (full path). So similar request to @BaburamShrestha.

 

 
This is a dodgy workaround to find the file with the latest modification time, but it cannot be fully trusted with the parallel execution of multiple files.
```
file_path = dbutils.widgets.get("file_path")
if file_path.endswith("/"๐Ÿ˜ž
    # Get the file with the latest modification time
    files = dbutils.fs.ls(file_path)
    latest_file = max(files, key=lambda file: file.modificationTime)

    # Print the file name and modification time
    print(f"Latest file: {latest_file.name}")
    print(f"Modification time: {latest_file.modificationTime}")
   
    # Read the file (if needed)
    file_path = latest_file.path.replace('dbfs:/', '/')
```

@szymon_dybczak Do you know if the there is any other way to retrieve the file name from trigger similar as you can do it in Azure Data Factory? Create event-based triggers - Azure Data Factory & Azure Synapse Analytics | Microsoft Learn
BTW, congrats on your Databricks Champion status.

 

Panda
Valued Contributor

Hi @BaburamShrestha,

We inquired about this a few days ago and checked with Databricks. They were working on the issue, but no ETA was provided. You can find more details here: Databricks Community Link.

However, to address this use case, we followed the steps below:

  1. Configure Autoloader with Directory Listing.
  2. Capture the File Path: Use the _metadata column to capture the file path of the newly arrived file:
    df_with_path = df.withColumn("input_file_path", input_file_name())
  3. Pass the File Path to the Next Task: Once the file path is captured, pass it to the next task in the pipeline using the appropriate workflow or task parameter mechanism.

I hope this helps. 

Hi @Panda

Many thanks for your answer.

It is a feasible solution, but last time I have checked you need to run your cluster for autoloader to work.

Can you use file_arrival trigger to start the cluster with autoloader running? I think the autoloader will pick up any changes as internally it uses streaming / checkpointing.

Did you get it to work in a described way?

 

Panda
Valued Contributor

@KrzysztofPrzyso 

Yes, we are now following this approach as an alternative solution, which involves combining Autoloader with Databricks File Arrival. In this case, you don't need to run cluster all the time instead use below Autoloader config. Pass the following parameter in the Autoloader configuration, which ensures that the stream only starts when a file arrives.

Parameter to use in Autoloader config:

 

.trigger(availableNow=True)

 

availableNow: When set to True, it processes all available data in multiple batches and then terminates the query. Note that only one trigger can be set. For more refer link DatastreamWriter Trigger 

I hope this helps. 

Sample Code:

 

# Autoloder Config example what I used

source_df = (
	spark.readStream.format("cloudFiles")
	.option("cloudFiles.format", <source_format>)
	.option("cloudFiles.inferColumnTypes", "true")
	.option("cloudFiles.useIncrementalListing", "false")
	.option("cloudFiles.schemaLocation", f"/tmp/loader_logs/<unique_id_for_job>")
	.option("sep", <sep>)
	.schema(<source_schema)
	.load(<abfs_path)
	.withColumn("filename", input_file_name())
)

#Perform Series of transformation - Just an example
ready_df = spark.sql("select * from department") #just an example
ready_df = ready_df.drop(col("employee")) #just an example
.........


ready_df.writeStream.option(
	"checkpointLocation", f"/tmp/loader/chkpoints/<destination_table_name>/"
).option("mergeSchema", "true").trigger(availableNow=True).toTable(<destination_table_name>)

 

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group