cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Databricks Autoloader - list only new files in an s3 bucket/directory

esalohs
New Contributor III

I have an s3 bucket with a couple of subdirectories/partitions like s3a://Bucket/dir1/ and s3a://Bucket/dir2/. There is currently in the millions of files sitting in bucket in the various subdirectories/partitions. I'm getting new data in near real time to this bucket via an s3 bucket synch. My question is the following:

I'd like to utilize autoloader and I only care about the new files which are synched to this bucket. The existing files I assume I'd have to log to rocks, but I don't really care about what's currently in there. I'd like to:

Every 15 minutes run a job which gets all the files that have been synched to the bucket in the last 15 minutes and then just get the file name itself (also don't necessarily care about the file content just the name). As all the files currently in there will time out/run into memory issues if I try and dbutils.fs.ls() into the bucket, I'd like to just get the incremental loads of files every 15 minutes. I've read the autoloader documentation and am thinking something like the below:

today = datetime.today()
fifteen_min_ago = (today - timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S')
read_options_prelanding = {
    'cloudFiles.format' : 'text',
    'cloudFiles.maxFilesPerTrigger' : 10000,
    'cloudFiles.includeExistingFiles': 'false',
    'checkpointLocation' : checkpoint_path,
    'recursiveFileLookup': 'true',
    'modifiedAfter': fifteen_min_ago
  }
def process_batch(batch_df, batch_id):
  new_file_names = batch_df.select('nerp_filename').rdd.flatMap(lambda x: x).collect()
  for file_name in new_file_names:
    print(f'New file: {file_name}')
    result_list.append(file_name)

autoloader = spark.readStream\
  .format('cloudFiles')\
  .options(**read_options_prelanding)\
  .schema('value STRING')\
  .load(autoload_path_prelanding)\
  .withColumn('filename', f.input_file_name())\
  .select('filename')

query = autoloader.writeStream\
  .option('checkpointLocation', checkpoint_path)\
  .option('ignoreMissingFiles','true')\
  .trigger(availableNow=True)\
  .foreachBatch(process_batch)\
  .start()

My cluster isn't set up for using sns so I don't think I can use the cloud notifications option. Will this work? I guess I'm kind of confused if I need to do an initial, one time stream/scan of the bucket first to get all the current files in there and commit them to rocks, and then run this every 15 minutes to get only the new files. Any help is appreciated!

6 REPLIES 6

esalohs
New Contributor III

Thanks @Retired_mod ! So I don't need to do an initial one time scan on the entire bucket/directory to commit the existing files to rocks? I can just use this to grab the incremental loads every 15 min? Thanks again for the help ๐Ÿ™‚ 

kulkpd
Contributor

@esalohs @Retired_mod ,
I tried similar use-cases, my finding is even though includeExistingFiles=false and modifiedAfter configuration is included. At very first execution, autoloader will scan the entire s3 and sync up the rocks, all later executions will be fast. 
Let me know if your findings.

Try considering file-notification option instead of dir listing as you want to process only LATEST records



 

esalohs
New Contributor III

hey @kulkpd - unfortunately I don't have file notifications enabled on my cluster. i did the initial scan and it took about 5 days as there were 30M files sitting in there. However, when I try and get the 'incremental files' it's still taking a fair amount of time. Would you be willing to share the solution you came up with? 

Thanks again ๐Ÿ™‚ 

kulkpd
Contributor

@esalohs Unfortunately even you specify includeExistingFiles as false still its going to do full scan. Probably some bug at Databricks side @Retired_mod ? My use-case was similar that process only latest files and not exisiting 150million files but nothing worked out and I had to spin up big cluster for initial first executing to sync the 150M files. all other subsequent execution were super fast.

esalohs
New Contributor III

would you happen to still have your code/outline/options you set handy? That would be incredible if you did @akul 

kulkpd
Contributor
below option used while performing spark.readStream:::

.
option('cloudFiles.format', 'json')
.option('cloudFiles.inferColumnTypes', 'true')
.option('cloudFiles.schemaEvolutionMode', 'rescue')
.option('cloudFiles.useNotifications', True)
.option('skipChangeCommits', 'true')
.option('cloudFiles.fetchParallelism', 100)
.option('cloudFiles.includeExistingFiles', False)
.option('cloudFiles.maxFilesPerTrigger', 10000)
.option('modifiedAfter', '2024-01-01 00:00:00.000000 UTC+0')
.option('cloudFiles.region', 'aws-region')
.option('cloudFiles.queueUrl', 'sqs-name')
.option('cloudFiles.roleArn', 'role-name-optional-if-instance-profile-has-permission')
.option('cloudFiles.roleSessionName','session-name')
.option('cloudFiles.stsEndpoint','required-if-roleArn-being-used')

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group