Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-08-2025 08:20 AM
Hey community,
I have a folder on s3 with around 5 million small files. On a daily basis new files are added. I would like to simply copy those new files to another folder on s3. My approach is to use an Auto Loader of which I attached the code below. The code works but is too slow. Is there any way to speed up this process? Or is there another approach without Auto Loader that is faster?
Thanks a lot!
from pyspark.sql.functions import col, lit
import datetime
source_path = "s3://"
destination_path = "s3://"
checkpoint_path = "/tmp/autoloader_checkpoints/test_job"
schema_path = "/tmp/autoloader_schemas/test_job"
stream_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "false")
.option("recursiveFileLookup", "true")
.option("pathGlobFilter", "*")
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.region", "eu-central-1")
.load(source_path))
# Process each file while preserving directory structure
def copy_files(batch_df, batch_id):
for row in batch_df.collect():
try:
src_path = row['path']
relative_path = src_path.replace(source_path, "")
dest_path = destination_path + relative_path
parent_dir = "/".join(dest_path.split("/")[:-1])
dbutils.fs.mkdirs(parent_dir)
dbutils.fs.cp(src_path, dest_path)
except Exception as e:
print(f"Failed to copy {src_path}: {str(e)}")
(stream_df.writeStream
.foreachBatch(copy_files)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.start()
.awaitTermination())