daan_dw
New Contributor III

Hey LRALVA
The first time running your code I got the error:  PicklingError: Could not serialize object: Exception: You cannot use dbutils within a spark job You cannot use dbutils within a spark job or otherwise pickle it.

So I changed the copy_single_file function to the one below and now it works exactly as expected. Thanks for putting me on the right track!

from pyspark.sql.functions import col, lit, input_file_name, regexp_replace, regexp_extract, concat
from pyspark.sql.types import BooleanType, StringType
from pyspark import SparkFiles
import os
import time
import boto3

source_path = ""
destination_path = ""
checkpoint_path = ""
schema_path = ""


spark.conf.set("spark.sql.files.maxPartitionBytes", "128m") # Smaller partitions for more parallelism
spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable adaptive query execution
spark.conf.set("spark.default.parallelism", 100) # Adjust based on your cluster size
spark.conf.set("spark.sql.shuffle.partitions", 100) # Adjust based on your cluster size
spark.conf.set("spark.databricks.io.cache.enabled", "true") # Enable IO cache if on Databricks


def copy_single_file(src_path, dest_path):
    try:
        s3 = boto3.client('s3')
        
        # Strip 's3://' and split into bucket and key
        def split_s3_path(s3_path):
            s3_path = s3_path.replace("s3://", "")
            bucket = s3_path.split("/")[0]
            key = "/".join(s3_path.split("/")[1:])
            return bucket, key

        source_bucket, source_key = split_s3_path(src_path)
        dest_bucket, dest_key = split_s3_path(dest_path)

        s3.copy_object(
            CopySource={'Bucket': source_bucket, 'Key': source_key},
            Bucket=dest_bucket,
            Key=dest_key
        )

        return "success"
    except Exception as e:
        return f"error: {str(e)}"

copy_file_udf = udf(copy_single_file, StringType())

file_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile") # Read files in binary format
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "true")
.option("recursiveFileLookup", "true") # Search all subdirectories
.option("pathGlobFilter", "*") # Process all file types
.option("cloudFiles.useNotifications", "true") # Use S3 notifications if available
.option("cloudFiles.fetchParallelism", 64) # Increase parallelism for listing files
.option("cloudFiles.maxFilesPerTrigger", 10000) # Process more files per batch
.option("cloudFiles.region", "eu-central-1")
.load(source_path)
.select("path", "length", "modificationTime")) # Only select needed columns to reduce memory

def process_batch(batch_df, batch_id):
    start_time = time.time()
    if batch_df.count() == 0:
        print(f"Batch {batch_id}: No files to process")
        return
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    processed_df = (batch_df
    # Create destination path by replacing source path with destination path
    .withColumn("relative_path",
    regexp_replace("path", source_path, ""))
    .withColumn("destination_path",
    concat(lit(destination_path), col("relative_path")))
    # Apply the copy operation to each file in parallel
    .withColumn("copy_result",
    copy_file_udf(col("path"), col("destination_path")))
    )
    file_count = batch_df.count()
    optimal_partitions = min(max(file_count // 1000, 8), 128) # Between 8-128 partitions
    result_df = processed_df.repartition(optimal_partitions).cache()
    success_count = result_df.filter(col("copy_result").startswith("success")).count()
    error_count = result_df.filter(col("copy_result").startswith("error")).count()
    if error_count > 0:
        errors_df = result_df.filter(col("copy_result").startswith("error"))
        print(f"Batch {batch_id}: Found {error_count} errors. Sample errors:")
        errors_df.select("path", "copy_result").show(10, truncate=False)
    duration = time.time() - start_time
    files_per_second = file_count / duration if duration > 0 else 0

    # Log summary
    print(f"""
    Batch {batch_id} completed at {timestamp}:
    - Files processed: {file_count}
    - Success: {success_count}
    - Errors: {error_count}
    - Duration: {duration:.2f} seconds
    - Performance: {files_per_second:.2f} files/second
    """)
    result_df.unpersist()

(file_stream.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True) # Process available files and terminate
.start()
.awaitTermination())

 

View solution in original post