Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-12-2025 01:29 AM
Hey LRALVA
The first time running your code I got the error: PicklingError: Could not serialize object: Exception: You cannot use dbutils within a spark job You cannot use dbutils within a spark job or otherwise pickle it.
So I changed the copy_single_file function to the one below and now it works exactly as expected. Thanks for putting me on the right track!
from pyspark.sql.functions import col, lit, input_file_name, regexp_replace, regexp_extract, concat
from pyspark.sql.types import BooleanType, StringType
from pyspark import SparkFiles
import os
import time
import boto3
source_path = ""
destination_path = ""
checkpoint_path = ""
schema_path = ""
spark.conf.set("spark.sql.files.maxPartitionBytes", "128m") # Smaller partitions for more parallelism
spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable adaptive query execution
spark.conf.set("spark.default.parallelism", 100) # Adjust based on your cluster size
spark.conf.set("spark.sql.shuffle.partitions", 100) # Adjust based on your cluster size
spark.conf.set("spark.databricks.io.cache.enabled", "true") # Enable IO cache if on Databricks
def copy_single_file(src_path, dest_path):
try:
s3 = boto3.client('s3')
# Strip 's3://' and split into bucket and key
def split_s3_path(s3_path):
s3_path = s3_path.replace("s3://", "")
bucket = s3_path.split("/")[0]
key = "/".join(s3_path.split("/")[1:])
return bucket, key
source_bucket, source_key = split_s3_path(src_path)
dest_bucket, dest_key = split_s3_path(dest_path)
s3.copy_object(
CopySource={'Bucket': source_bucket, 'Key': source_key},
Bucket=dest_bucket,
Key=dest_key
)
return "success"
except Exception as e:
return f"error: {str(e)}"
copy_file_udf = udf(copy_single_file, StringType())
file_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile") # Read files in binary format
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "true")
.option("recursiveFileLookup", "true") # Search all subdirectories
.option("pathGlobFilter", "*") # Process all file types
.option("cloudFiles.useNotifications", "true") # Use S3 notifications if available
.option("cloudFiles.fetchParallelism", 64) # Increase parallelism for listing files
.option("cloudFiles.maxFilesPerTrigger", 10000) # Process more files per batch
.option("cloudFiles.region", "eu-central-1")
.load(source_path)
.select("path", "length", "modificationTime")) # Only select needed columns to reduce memory
def process_batch(batch_df, batch_id):
start_time = time.time()
if batch_df.count() == 0:
print(f"Batch {batch_id}: No files to process")
return
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
processed_df = (batch_df
# Create destination path by replacing source path with destination path
.withColumn("relative_path",
regexp_replace("path", source_path, ""))
.withColumn("destination_path",
concat(lit(destination_path), col("relative_path")))
# Apply the copy operation to each file in parallel
.withColumn("copy_result",
copy_file_udf(col("path"), col("destination_path")))
)
file_count = batch_df.count()
optimal_partitions = min(max(file_count // 1000, 8), 128) # Between 8-128 partitions
result_df = processed_df.repartition(optimal_partitions).cache()
success_count = result_df.filter(col("copy_result").startswith("success")).count()
error_count = result_df.filter(col("copy_result").startswith("error")).count()
if error_count > 0:
errors_df = result_df.filter(col("copy_result").startswith("error"))
print(f"Batch {batch_id}: Found {error_count} errors. Sample errors:")
errors_df.select("path", "copy_result").show(10, truncate=False)
duration = time.time() - start_time
files_per_second = file_count / duration if duration > 0 else 0
# Log summary
print(f"""
Batch {batch_id} completed at {timestamp}:
- Files processed: {file_count}
- Success: {success_count}
- Errors: {error_count}
- Duration: {duration:.2f} seconds
- Performance: {files_per_second:.2f} files/second
""")
result_df.unpersist()
(file_stream.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True) # Process available files and terminate
.start()
.awaitTermination())