05-08-2025 08:20 AM
Hey community,
I have a folder on s3 with around 5 million small files. On a daily basis new files are added. I would like to simply copy those new files to another folder on s3. My approach is to use an Auto Loader of which I attached the code below. The code works but is too slow. Is there any way to speed up this process? Or is there another approach without Auto Loader that is faster?
Thanks a lot!
from pyspark.sql.functions import col, lit
import datetime
source_path = "s3://"
destination_path = "s3://"
checkpoint_path = "/tmp/autoloader_checkpoints/test_job"
schema_path = "/tmp/autoloader_schemas/test_job"
stream_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "false")
.option("recursiveFileLookup", "true")
.option("pathGlobFilter", "*")
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.region", "eu-central-1")
.load(source_path))
# Process each file while preserving directory structure
def copy_files(batch_df, batch_id):
for row in batch_df.collect():
try:
src_path = row['path']
relative_path = src_path.replace(source_path, "")
dest_path = destination_path + relative_path
parent_dir = "/".join(dest_path.split("/")[:-1])
dbutils.fs.mkdirs(parent_dir)
dbutils.fs.cp(src_path, dest_path)
except Exception as e:
print(f"Failed to copy {src_path}: {str(e)}")
(stream_df.writeStream
.foreachBatch(copy_files)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.start()
.awaitTermination())
05-12-2025 01:29 AM
Hey LRALVA
The first time running your code I got the error: PicklingError: Could not serialize object: Exception: You cannot use dbutils within a spark job You cannot use dbutils within a spark job or otherwise pickle it.
So I changed the copy_single_file function to the one below and now it works exactly as expected. Thanks for putting me on the right track!
from pyspark.sql.functions import col, lit, input_file_name, regexp_replace, regexp_extract, concat
from pyspark.sql.types import BooleanType, StringType
from pyspark import SparkFiles
import os
import time
import boto3
source_path = ""
destination_path = ""
checkpoint_path = ""
schema_path = ""
spark.conf.set("spark.sql.files.maxPartitionBytes", "128m") # Smaller partitions for more parallelism
spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable adaptive query execution
spark.conf.set("spark.default.parallelism", 100) # Adjust based on your cluster size
spark.conf.set("spark.sql.shuffle.partitions", 100) # Adjust based on your cluster size
spark.conf.set("spark.databricks.io.cache.enabled", "true") # Enable IO cache if on Databricks
def copy_single_file(src_path, dest_path):
try:
s3 = boto3.client('s3')
# Strip 's3://' and split into bucket and key
def split_s3_path(s3_path):
s3_path = s3_path.replace("s3://", "")
bucket = s3_path.split("/")[0]
key = "/".join(s3_path.split("/")[1:])
return bucket, key
source_bucket, source_key = split_s3_path(src_path)
dest_bucket, dest_key = split_s3_path(dest_path)
s3.copy_object(
CopySource={'Bucket': source_bucket, 'Key': source_key},
Bucket=dest_bucket,
Key=dest_key
)
return "success"
except Exception as e:
return f"error: {str(e)}"
copy_file_udf = udf(copy_single_file, StringType())
file_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile") # Read files in binary format
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "true")
.option("recursiveFileLookup", "true") # Search all subdirectories
.option("pathGlobFilter", "*") # Process all file types
.option("cloudFiles.useNotifications", "true") # Use S3 notifications if available
.option("cloudFiles.fetchParallelism", 64) # Increase parallelism for listing files
.option("cloudFiles.maxFilesPerTrigger", 10000) # Process more files per batch
.option("cloudFiles.region", "eu-central-1")
.load(source_path)
.select("path", "length", "modificationTime")) # Only select needed columns to reduce memory
def process_batch(batch_df, batch_id):
start_time = time.time()
if batch_df.count() == 0:
print(f"Batch {batch_id}: No files to process")
return
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
processed_df = (batch_df
# Create destination path by replacing source path with destination path
.withColumn("relative_path",
regexp_replace("path", source_path, ""))
.withColumn("destination_path",
concat(lit(destination_path), col("relative_path")))
# Apply the copy operation to each file in parallel
.withColumn("copy_result",
copy_file_udf(col("path"), col("destination_path")))
)
file_count = batch_df.count()
optimal_partitions = min(max(file_count // 1000, 8), 128) # Between 8-128 partitions
result_df = processed_df.repartition(optimal_partitions).cache()
success_count = result_df.filter(col("copy_result").startswith("success")).count()
error_count = result_df.filter(col("copy_result").startswith("error")).count()
if error_count > 0:
errors_df = result_df.filter(col("copy_result").startswith("error"))
print(f"Batch {batch_id}: Found {error_count} errors. Sample errors:")
errors_df.select("path", "copy_result").show(10, truncate=False)
duration = time.time() - start_time
files_per_second = file_count / duration if duration > 0 else 0
# Log summary
print(f"""
Batch {batch_id} completed at {timestamp}:
- Files processed: {file_count}
- Success: {success_count}
- Errors: {error_count}
- Duration: {duration:.2f} seconds
- Performance: {files_per_second:.2f} files/second
""")
result_df.unpersist()
(file_stream.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True) # Process available files and terminate
.start()
.awaitTermination())
05-08-2025 01:47 PM - edited 05-08-2025 01:49 PM
You're facing a classic small files problem in S3, which is challenging to solve efficiently. Your current Auto Loader approach has performance limitations when processing millions of small files individually. Let me suggest several optimizations and alternative approaches to speed up this process.
Key Performance Issues
Your copy_files function processes files one by one with dbutils.fs.cp, creating a lot of overhead
Using batch_df.collect() brings all file paths to the driver, creating memory pressure
Individual S3 operations have high latency, especially when done sequentially.
Try below code:
05-12-2025 01:29 AM
Hey LRALVA
The first time running your code I got the error: PicklingError: Could not serialize object: Exception: You cannot use dbutils within a spark job You cannot use dbutils within a spark job or otherwise pickle it.
So I changed the copy_single_file function to the one below and now it works exactly as expected. Thanks for putting me on the right track!
from pyspark.sql.functions import col, lit, input_file_name, regexp_replace, regexp_extract, concat
from pyspark.sql.types import BooleanType, StringType
from pyspark import SparkFiles
import os
import time
import boto3
source_path = ""
destination_path = ""
checkpoint_path = ""
schema_path = ""
spark.conf.set("spark.sql.files.maxPartitionBytes", "128m") # Smaller partitions for more parallelism
spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable adaptive query execution
spark.conf.set("spark.default.parallelism", 100) # Adjust based on your cluster size
spark.conf.set("spark.sql.shuffle.partitions", 100) # Adjust based on your cluster size
spark.conf.set("spark.databricks.io.cache.enabled", "true") # Enable IO cache if on Databricks
def copy_single_file(src_path, dest_path):
try:
s3 = boto3.client('s3')
# Strip 's3://' and split into bucket and key
def split_s3_path(s3_path):
s3_path = s3_path.replace("s3://", "")
bucket = s3_path.split("/")[0]
key = "/".join(s3_path.split("/")[1:])
return bucket, key
source_bucket, source_key = split_s3_path(src_path)
dest_bucket, dest_key = split_s3_path(dest_path)
s3.copy_object(
CopySource={'Bucket': source_bucket, 'Key': source_key},
Bucket=dest_bucket,
Key=dest_key
)
return "success"
except Exception as e:
return f"error: {str(e)}"
copy_file_udf = udf(copy_single_file, StringType())
file_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile") # Read files in binary format
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "true")
.option("recursiveFileLookup", "true") # Search all subdirectories
.option("pathGlobFilter", "*") # Process all file types
.option("cloudFiles.useNotifications", "true") # Use S3 notifications if available
.option("cloudFiles.fetchParallelism", 64) # Increase parallelism for listing files
.option("cloudFiles.maxFilesPerTrigger", 10000) # Process more files per batch
.option("cloudFiles.region", "eu-central-1")
.load(source_path)
.select("path", "length", "modificationTime")) # Only select needed columns to reduce memory
def process_batch(batch_df, batch_id):
start_time = time.time()
if batch_df.count() == 0:
print(f"Batch {batch_id}: No files to process")
return
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
processed_df = (batch_df
# Create destination path by replacing source path with destination path
.withColumn("relative_path",
regexp_replace("path", source_path, ""))
.withColumn("destination_path",
concat(lit(destination_path), col("relative_path")))
# Apply the copy operation to each file in parallel
.withColumn("copy_result",
copy_file_udf(col("path"), col("destination_path")))
)
file_count = batch_df.count()
optimal_partitions = min(max(file_count // 1000, 8), 128) # Between 8-128 partitions
result_df = processed_df.repartition(optimal_partitions).cache()
success_count = result_df.filter(col("copy_result").startswith("success")).count()
error_count = result_df.filter(col("copy_result").startswith("error")).count()
if error_count > 0:
errors_df = result_df.filter(col("copy_result").startswith("error"))
print(f"Batch {batch_id}: Found {error_count} errors. Sample errors:")
errors_df.select("path", "copy_result").show(10, truncate=False)
duration = time.time() - start_time
files_per_second = file_count / duration if duration > 0 else 0
# Log summary
print(f"""
Batch {batch_id} completed at {timestamp}:
- Files processed: {file_count}
- Success: {success_count}
- Errors: {error_count}
- Duration: {duration:.2f} seconds
- Performance: {files_per_second:.2f} files/second
""")
result_df.unpersist()
(file_stream.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True) # Process available files and terminate
.start()
.awaitTermination())
05-12-2025 07:33 AM
Hey @daan_dw Thanks for the update.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now