cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Autoloader - list only new files in an s3 bucket/directory

esalohs
New Contributor III

I have an s3 bucket with a couple of subdirectories/partitions like s3a://Bucket/dir1/ and s3a://Bucket/dir2/. There is currently in the millions of files sitting in bucket in the various subdirectories/partitions. I'm getting new data in near real time to this bucket via an s3 bucket synch. My question is the following:

I'd like to utilize autoloader and I only care about the new files which are synched to this bucket. The existing files I assume I'd have to log to rocks, but I don't really care about what's currently in there. I'd like to:

Every 15 minutes run a job which gets all the files that have been synched to the bucket in the last 15 minutes and then just get the file name itself (also don't necessarily care about the file content just the name). As all the files currently in there will time out/run into memory issues if I try and dbutils.fs.ls() into the bucket, I'd like to just get the incremental loads of files every 15 minutes. I've read the autoloader documentation and am thinking something like the below:

today = datetime.today()
fifteen_min_ago = (today - timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S')
read_options_prelanding = {
    'cloudFiles.format' : 'text',
    'cloudFiles.maxFilesPerTrigger' : 10000,
    'cloudFiles.includeExistingFiles': 'false',
    'checkpointLocation' : checkpoint_path,
    'recursiveFileLookup': 'true',
    'modifiedAfter': fifteen_min_ago
  }
def process_batch(batch_df, batch_id):
  new_file_names = batch_df.select('nerp_filename').rdd.flatMap(lambda x: x).collect()
  for file_name in new_file_names:
    print(f'New file: {file_name}')
    result_list.append(file_name)

autoloader = spark.readStream\
  .format('cloudFiles')\
  .options(**read_options_prelanding)\
  .schema('value STRING')\
  .load(autoload_path_prelanding)\
  .withColumn('filename', f.input_file_name())\
  .select('filename')

query = autoloader.writeStream\
  .option('checkpointLocation', checkpoint_path)\
  .option('ignoreMissingFiles','true')\
  .trigger(availableNow=True)\
  .foreachBatch(process_batch)\
  .start()

My cluster isn't set up for using sns so I don't think I can use the cloud notifications option. Will this work? I guess I'm kind of confused if I need to do an initial, one time stream/scan of the bucket first to get all the current files in there and commit them to rocks, and then run this every 15 minutes to get only the new files. Any help is appreciated!

7 REPLIES 7

Kaniz
Community Manager
Community Manager

Hi @esalohs , Certainly! It looks like you’re on the right track with using Auto Loader for ingesting new files from your S3 bucket. Let’s break down your approach and make sure everything aligns with your requirements:

 

Auto Loader Configuration:

  • You’ve set up the cloudFiles.format to 'text', which is appropriate if your files are in plain text format.
  • The cloudFiles.maxFilesPerTrigger parameter is set to 10,000, which means that Auto Loader will process up to 10,000 files per trigger. Adjust this value based on your specific use case.
  • You’ve correctly set cloudFiles.includeExistingFiles to 'false', ensuring that only new files are processed.
  • The modifiedAfter parameter is set to capture files modified within the last 15 minutes.

Processing New Files:

  • Your process_batch function extracts the filenames from the batch DataFrame.
  • You’re printing the new filenames and appending them to a result_list.

Auto Loader Stream Setup:

  • You’re reading data from the autoload_path_prelanding using Auto Loader.
  • The withColumn('filename', f.input_file_name()) line adds a new column called 'filename' containing the full path of each file.
  • Finally, you select only the 'filename' column.

Write Stream Configuration:

  • You’ve set the checkpoint location to checkpoint_path.
  • The ignoreMissingFiles option is set to 'true', which means that if a file is deleted or missing, it won’t cause the stream to fail.

Scheduling:

  • To run this job every 15 minutes, you can create a Databricks Job with a schedule set to execute at the desired interval (e.g., every 15 minutes).

Remember to replace placeholders like checkpoint_path and autoload_path_prelanding with your actual paths. Additionally, ensure that your Spark session is properly configured for S3 access.

 

Feel free to adjust any other parameters based on your specific requirements. Happy streaming! 🚀

esalohs
New Contributor III

Thanks @Kaniz ! So I don't need to do an initial one time scan on the entire bucket/directory to commit the existing files to rocks? I can just use this to grab the incremental loads every 15 min? Thanks again for the help 🙂 

kulkpd
Contributor

@esalohs @Kaniz ,
I tried similar use-cases, my finding is even though includeExistingFiles=false and modifiedAfter configuration is included. At very first execution, autoloader will scan the entire s3 and sync up the rocks, all later executions will be fast. 
Let me know if your findings.

Try considering file-notification option instead of dir listing as you want to process only LATEST records



 

esalohs
New Contributor III

hey @kulkpd - unfortunately I don't have file notifications enabled on my cluster. i did the initial scan and it took about 5 days as there were 30M files sitting in there. However, when I try and get the 'incremental files' it's still taking a fair amount of time. Would you be willing to share the solution you came up with? 

Thanks again 🙂 

kulkpd
Contributor

@esalohs Unfortunately even you specify includeExistingFiles as false still its going to do full scan. Probably some bug at Databricks side @Kaniz ? My use-case was similar that process only latest files and not exisiting 150million files but nothing worked out and I had to spin up big cluster for initial first executing to sync the 150M files. all other subsequent execution were super fast.

esalohs
New Contributor III

would you happen to still have your code/outline/options you set handy? That would be incredible if you did @akul 

kulkpd
Contributor
below option used while performing spark.readStream:::

.
option('cloudFiles.format', 'json')
.option('cloudFiles.inferColumnTypes', 'true')
.option('cloudFiles.schemaEvolutionMode', 'rescue')
.option('cloudFiles.useNotifications', True)
.option('skipChangeCommits', 'true')
.option('cloudFiles.fetchParallelism', 100)
.option('cloudFiles.includeExistingFiles', False)
.option('cloudFiles.maxFilesPerTrigger', 10000)
.option('modifiedAfter', '2024-01-01 00:00:00.000000 UTC+0')
.option('cloudFiles.region', 'aws-region')
.option('cloudFiles.queueUrl', 'sqs-name')
.option('cloudFiles.roleArn', 'role-name-optional-if-instance-profile-has-permission')
.option('cloudFiles.roleSessionName','session-name')
.option('cloudFiles.stsEndpoint','required-if-roleArn-being-used')
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.