cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

File trigger options -- cloudFiles.allowOverwrites

srinivas_001
New Contributor III

I have a Job configured to run on the file arrival 
I have provided the path as 
File arrival path: s3://test_bucket/test_cat/test_schema/

When a new parquet file arrived in this path the job was triggering automatically and processed the file

In case of reloading means, Overwriting the existing file
I am uploading the same file again (with same name) to this path then No run was triggered 
(No worries about duplicating the data I just need to trigger the job)
Code as below:
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("inferSchema", "false")
.option("cloudFiles.allowOverwrites", "true")
.option("cloudFiles.schemaLocation", "checkpoint_dir")
.load(data_source)

Do I need to enable any other settings in order to trigger the job?

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @srinivas_001Letโ€™s explore the configuration options related to triggering your job when reloading files using Auto Loader in Spark Structured Streaming.

  1. cloudFiles.allowOverwrites: Youโ€™ve already set this option to true, which allows input directory file changes to overwrite existing data. However, there are a few caveats regarding enabling this configuration. Please refer to the Auto Loader FAQ for details1. Make sure youโ€™re using Databricks Runtime 7.6 or above to take advantage of this setting.

  2. cloudFiles.schemaLocation: This option specifies the location to store the inferred schema and subsequent changes. Itโ€™s required when inferring the schema. Youโ€™ve set it to "checkpoint_dir" in your code snippet, which is a good practice. Ensure that the checkpoint directory is correctly configured and accessible.

  3. cloudFiles.includeExistingFiles: By default, this option is set to true. It determines whether to include existing files in the stream processing input path or only process new files arriving after the initial setup. Note that this option is evaluated only when you start a stream for the first time. Changing it after restarting the stream has no effect.

  4. cloudFiles.inferColumnTypes: If youโ€™re leveraging schema inference, set this option to true. By default, columns are inferred as strings when inferring JSON and CSV datasets. Enabling this ensures that exact column types are inferred2.

  5. cloudFiles.maxBytesPerTrigger: This option limits the maximum number of new bytes to be processed in every trigger. You can specify a byte string (e.g., "10g" for 10 GB) to control the size of each microbatch. Keep in mind that this is a soft maximum, and Databricks processes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first.

Remember to verify your overall job configuration, including the checkpoint location, and ensure that your data source path (data_source) points to the correct S3 location. If youโ€™ve covered the points mentioned above, your job should trigger appropriately when reloading files.

Happy streaming! ๐Ÿš€

 

srinivas_001
New Contributor III

Hi Kaniz,

Thank you for the response.
I am using the databricks runtime 11.3, also checked the checkpoint and data source location which are properly configured. Still I am unable to trigger the job.

NOTE: Incoming files are pushed to AWS s3 location from Apache airflow with REPLACE option TRUE.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.