Auto Loader for copying files on s3

daan_dw
New Contributor III

Hey community,

I have a folder on s3 with around 5 million small files. On a daily basis new files are added. I would like to simply copy those new files to another folder on s3. My approach is to use an Auto Loader of which I attached the code below. The code works but is too slow. Is there any way to speed up this process? Or is there another approach without Auto Loader that is faster?

Thanks a lot!

from pyspark.sql.functions import col, lit
import datetime

source_path = "s3://"
destination_path = "s3://"
checkpoint_path = "/tmp/autoloader_checkpoints/test_job"
schema_path = "/tmp/autoloader_schemas/test_job"


stream_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("cloudFiles.schemaLocation", schema_path)
    .option("cloudFiles.includeExistingFiles", "false") 
    .option("recursiveFileLookup", "true")  
    .option("pathGlobFilter", "*")  
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.region", "eu-central-1")
    .load(source_path))

# Process each file while preserving directory structure
def copy_files(batch_df, batch_id):
    for row in batch_df.collect():
        try:
            src_path = row['path']
            relative_path = src_path.replace(source_path, "")
            dest_path = destination_path + relative_path
            
            parent_dir = "/".join(dest_path.split("/")[:-1])
            dbutils.fs.mkdirs(parent_dir)
            dbutils.fs.cp(src_path, dest_path)
        except Exception as e:
            print(f"Failed to copy {src_path}: {str(e)}")

(stream_df.writeStream
    .foreachBatch(copy_files)
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .start()
    .awaitTermination())