<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Auto Loader for copying files on s3 in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118856#M45728</link>
    <description>&lt;P&gt;Hey LRALVA&lt;BR /&gt;The first time running your code I got the error:&amp;nbsp;&lt;SPAN&gt;&amp;nbsp;&lt;EM&gt;PicklingError: Could not serialize object: Exception: You cannot use dbutils within a spark job You cannot use dbutils within a spark job or otherwise pickle it.&lt;/EM&gt;&lt;BR /&gt;&lt;BR /&gt;So I changed the copy_single_file function to the one below and now it works exactly as expected. Thanks for putting me on the right track!&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;from pyspark.sql.functions import col, lit, input_file_name, regexp_replace, regexp_extract, concat
from pyspark.sql.types import BooleanType, StringType
from pyspark import SparkFiles
import os
import time
import boto3

source_path = ""
destination_path = ""
checkpoint_path = ""
schema_path = ""


spark.conf.set("spark.sql.files.maxPartitionBytes", "128m") # Smaller partitions for more parallelism
spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable adaptive query execution
spark.conf.set("spark.default.parallelism", 100) # Adjust based on your cluster size
spark.conf.set("spark.sql.shuffle.partitions", 100) # Adjust based on your cluster size
spark.conf.set("spark.databricks.io.cache.enabled", "true") # Enable IO cache if on Databricks


def copy_single_file(src_path, dest_path):
    try:
        s3 = boto3.client('s3')
        
        # Strip 's3://' and split into bucket and key
        def split_s3_path(s3_path):
            s3_path = s3_path.replace("s3://", "")
            bucket = s3_path.split("/")[0]
            key = "/".join(s3_path.split("/")[1:])
            return bucket, key

        source_bucket, source_key = split_s3_path(src_path)
        dest_bucket, dest_key = split_s3_path(dest_path)

        s3.copy_object(
            CopySource={'Bucket': source_bucket, 'Key': source_key},
            Bucket=dest_bucket,
            Key=dest_key
        )

        return "success"
    except Exception as e:
        return f"error: {str(e)}"

copy_file_udf = udf(copy_single_file, StringType())

file_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile") # Read files in binary format
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "true")
.option("recursiveFileLookup", "true") # Search all subdirectories
.option("pathGlobFilter", "*") # Process all file types
.option("cloudFiles.useNotifications", "true") # Use S3 notifications if available
.option("cloudFiles.fetchParallelism", 64) # Increase parallelism for listing files
.option("cloudFiles.maxFilesPerTrigger", 10000) # Process more files per batch
.option("cloudFiles.region", "eu-central-1")
.load(source_path)
.select("path", "length", "modificationTime")) # Only select needed columns to reduce memory

def process_batch(batch_df, batch_id):
    start_time = time.time()
    if batch_df.count() == 0:
        print(f"Batch {batch_id}: No files to process")
        return
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    processed_df = (batch_df
    # Create destination path by replacing source path with destination path
    .withColumn("relative_path",
    regexp_replace("path", source_path, ""))
    .withColumn("destination_path",
    concat(lit(destination_path), col("relative_path")))
    # Apply the copy operation to each file in parallel
    .withColumn("copy_result",
    copy_file_udf(col("path"), col("destination_path")))
    )
    file_count = batch_df.count()
    optimal_partitions = min(max(file_count // 1000, 8), 128) # Between 8-128 partitions
    result_df = processed_df.repartition(optimal_partitions).cache()
    success_count = result_df.filter(col("copy_result").startswith("success")).count()
    error_count = result_df.filter(col("copy_result").startswith("error")).count()
    if error_count &amp;gt; 0:
        errors_df = result_df.filter(col("copy_result").startswith("error"))
        print(f"Batch {batch_id}: Found {error_count} errors. Sample errors:")
        errors_df.select("path", "copy_result").show(10, truncate=False)
    duration = time.time() - start_time
    files_per_second = file_count / duration if duration &amp;gt; 0 else 0

    # Log summary
    print(f"""
    Batch {batch_id} completed at {timestamp}:
    - Files processed: {file_count}
    - Success: {success_count}
    - Errors: {error_count}
    - Duration: {duration:.2f} seconds
    - Performance: {files_per_second:.2f} files/second
    """)
    result_df.unpersist()

(file_stream.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True) # Process available files and terminate
.start()
.awaitTermination())&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 12 May 2025 08:29:39 GMT</pubDate>
    <dc:creator>daan_dw</dc:creator>
    <dc:date>2025-05-12T08:29:39Z</dc:date>
    <item>
      <title>Auto Loader for copying files on s3</title>
      <link>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118549#M45655</link>
      <description>&lt;P&gt;Hey community,&lt;BR /&gt;&lt;BR /&gt;I have a folder on s3 with around 5 million small files. On a daily basis new files are added. I would like to simply copy those new files to another folder on s3. My approach is to use an Auto Loader of which I attached the code below. The code works but is too slow. Is there any way to speed up this process? Or is there another approach without Auto Loader that is faster?&lt;BR /&gt;&lt;BR /&gt;Thanks a lot!&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;from pyspark.sql.functions import col, lit
import datetime

source_path = "s3://"
destination_path = "s3://"
checkpoint_path = "/tmp/autoloader_checkpoints/test_job"
schema_path = "/tmp/autoloader_schemas/test_job"


stream_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("cloudFiles.schemaLocation", schema_path)
    .option("cloudFiles.includeExistingFiles", "false") 
    .option("recursiveFileLookup", "true")  
    .option("pathGlobFilter", "*")  
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.region", "eu-central-1")
    .load(source_path))

# Process each file while preserving directory structure
def copy_files(batch_df, batch_id):
    for row in batch_df.collect():
        try:
            src_path = row['path']
            relative_path = src_path.replace(source_path, "")
            dest_path = destination_path + relative_path
            
            parent_dir = "/".join(dest_path.split("/")[:-1])
            dbutils.fs.mkdirs(parent_dir)
            dbutils.fs.cp(src_path, dest_path)
        except Exception as e:
            print(f"Failed to copy {src_path}: {str(e)}")

(stream_df.writeStream
    .foreachBatch(copy_files)
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .start()
    .awaitTermination())&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 08 May 2025 15:20:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118549#M45655</guid>
      <dc:creator>daan_dw</dc:creator>
      <dc:date>2025-05-08T15:20:32Z</dc:date>
    </item>
    <item>
      <title>Re: Auto Loader for copying files on s3</title>
      <link>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118586#M45659</link>
      <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/149877"&gt;@daan_dw&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You're facing a classic small files problem in S3, which is challenging to solve efficiently. Your current Auto Loader approach has performance limitations when processing millions of small files individually. Let me suggest several optimizations and alternative approaches to speed up this process.&lt;BR /&gt;Key Performance Issues&lt;/P&gt;&lt;P&gt;Your copy_files function processes files one by one with dbutils.fs.cp, creating a lot of overhead&lt;BR /&gt;Using batch_df.collect() brings all file paths to the driver, creating memory pressure&lt;BR /&gt;Individual S3 operations have high latency, especially when done sequentially.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Try below code:&lt;/STRONG&gt;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;input_file_name&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;regexp_replace&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;regexp_extract&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; BooleanType, StringType&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; SparkFiles&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;os&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;time&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# S3 paths configuration&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;source_path&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"s3://source-bucket/source-folder/"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;destination_path&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"s3://destination-bucket/destination-folder/"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;checkpoint_path&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"/tmp/autoloader_checkpoints/file_copy_job"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;schema_path&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"/tmp/autoloader_schemas/file_copy_job"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Performance tuning configuration&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.set(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.sql.files.maxPartitionBytes"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"128m"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Smaller partitions for more parallelism&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.set(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.sql.adaptive.enabled"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Enable adaptive query execution&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.set(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.default.parallelism"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;100&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Adjust based on your cluster size&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.set(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.sql.shuffle.partitions"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;100&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Adjust based on your cluster size&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.set(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.databricks.io.cache.enabled"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Enable IO cache if on Databricks&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define a UDF for copying files that preserves structure and handles errors&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;copy_single_file&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;src_path&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;dest_path&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Ensure parent directory exists&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;parent_dir&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"/"&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;join&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;dest_path&lt;/SPAN&gt;&lt;SPAN&gt;.split(&lt;/SPAN&gt;&lt;SPAN&gt;"/"&lt;/SPAN&gt;&lt;SPAN&gt;)[:&lt;/SPAN&gt;&lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;]) &lt;/SPAN&gt;&lt;SPAN&gt;+&lt;/SPAN&gt; &lt;SPAN&gt;"/"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dbutils.fs.mkdirs(&lt;/SPAN&gt;&lt;SPAN&gt;parent_dir&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Copy the file&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dbutils.fs.cp(&lt;/SPAN&gt;&lt;SPAN&gt;src_path&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;dest_path&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;return&lt;/SPAN&gt; &lt;SPAN&gt;"success"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;except&lt;/SPAN&gt; &lt;SPAN&gt;Exception&lt;/SPAN&gt; &lt;SPAN&gt;as&lt;/SPAN&gt; &lt;SPAN&gt;e&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;return&lt;/SPAN&gt; &lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"error: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;str&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;e&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Register the UDF&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;copy_file_udf&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; udf(&lt;/SPAN&gt;&lt;SPAN&gt;copy_single_file&lt;/SPAN&gt;&lt;SPAN&gt;, StringType())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Set up the Auto Loader stream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;file_stream&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.format(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"binaryFile"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Read files in binary format&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.schemaLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;schema_path&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.includeExistingFiles"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"false"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"recursiveFileLookup"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Search all subdirectories&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"pathGlobFilter"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"*"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Process all file types&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.useNotifications"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Use S3 notifications if available&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.fetchParallelism"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;64&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Increase parallelism for listing files&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.maxFilesPerTrigger"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;10000&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Process more files per batch&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.region"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"eu-central-1"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.load(&lt;/SPAN&gt;&lt;SPAN&gt;source_path&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.select(&lt;/SPAN&gt;&lt;SPAN&gt;"path"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"length"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"modificationTime"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;# Only select needed columns to reduce memory&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Process each batch efficiently&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;process_batch&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;batch_df&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;start_time&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;time&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;time&lt;/SPAN&gt;&lt;SPAN&gt;()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Skip empty batches&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;if&lt;/SPAN&gt; &lt;SPAN&gt;batch_df&lt;/SPAN&gt;&lt;SPAN&gt;.count() &lt;/SPAN&gt;&lt;SPAN&gt;==&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"Batch &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;: No files to process"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Get timestamp for logging&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;timestamp&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;time&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;strftime&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"%Y-%m-&lt;/SPAN&gt;&lt;SPAN&gt;%d&lt;/SPAN&gt;&lt;SPAN&gt; %H:%M:%S"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Calculate the destination path for each file by preserving directory structure&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;processed_df&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;batch_df&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Create destination path by replacing source path with destination path&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.withColumn(&lt;/SPAN&gt;&lt;SPAN&gt;"relative_path"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;regexp_replace&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"path"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;source_path&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;""&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.withColumn(&lt;/SPAN&gt;&lt;SPAN&gt;"destination_path"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;concat(&lt;/SPAN&gt;&lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;destination_path&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"relative_path"&lt;/SPAN&gt;&lt;SPAN&gt;)))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Apply the copy operation to each file in parallel&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.withColumn(&lt;/SPAN&gt;&lt;SPAN&gt;"copy_result"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;copy_file_udf&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"path"&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"destination_path"&lt;/SPAN&gt;&lt;SPAN&gt;)))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Repartition for better parallelism based on file size distribution&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# More partitions = more parallel operations&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;file_count&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;batch_df&lt;/SPAN&gt;&lt;SPAN&gt;.count()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;optimal_partitions&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;min&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;max&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;file_count&lt;/SPAN&gt; &lt;SPAN&gt;//&lt;/SPAN&gt; &lt;SPAN&gt;1000&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;8&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;128&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Between 8-128 partitions&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Force execution and collect metrics&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;result_df&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;processed_df&lt;/SPAN&gt;&lt;SPAN&gt;.repartition(&lt;/SPAN&gt;&lt;SPAN&gt;optimal_partitions&lt;/SPAN&gt;&lt;SPAN&gt;).cache()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Trigger execution and collect stats&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;success_count&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;result_df&lt;/SPAN&gt;&lt;SPAN&gt;.filter(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"copy_result"&lt;/SPAN&gt;&lt;SPAN&gt;).startswith(&lt;/SPAN&gt;&lt;SPAN&gt;"success"&lt;/SPAN&gt;&lt;SPAN&gt;)).count()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;error_count&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;result_df&lt;/SPAN&gt;&lt;SPAN&gt;.filter(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"copy_result"&lt;/SPAN&gt;&lt;SPAN&gt;).startswith(&lt;/SPAN&gt;&lt;SPAN&gt;"error"&lt;/SPAN&gt;&lt;SPAN&gt;)).count()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Log errors for investigation&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;if&lt;/SPAN&gt; &lt;SPAN&gt;error_count&lt;/SPAN&gt; &lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;errors_df&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;result_df&lt;/SPAN&gt;&lt;SPAN&gt;.filter(&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"copy_result"&lt;/SPAN&gt;&lt;SPAN&gt;).startswith(&lt;/SPAN&gt;&lt;SPAN&gt;"error"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"Batch &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;: Found &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;error_count&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt; errors. Sample errors:"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;errors_df&lt;/SPAN&gt;&lt;SPAN&gt;.select(&lt;/SPAN&gt;&lt;SPAN&gt;"path"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"copy_result"&lt;/SPAN&gt;&lt;SPAN&gt;).show(&lt;/SPAN&gt;&lt;SPAN&gt;10&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;truncate&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;False&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Optionally write errors to a log location&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;errors_df&lt;/SPAN&gt;&lt;SPAN&gt;.write.mode(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;).parquet(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;destination_path&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;/_error_logs/&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Calculate performance metrics&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;duration&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;time&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;time&lt;/SPAN&gt;&lt;SPAN&gt;() &lt;/SPAN&gt;&lt;SPAN&gt;-&lt;/SPAN&gt; &lt;SPAN&gt;start_time&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;files_per_second&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;file_count&lt;/SPAN&gt; &lt;SPAN&gt;/&lt;/SPAN&gt; &lt;SPAN&gt;duration&lt;/SPAN&gt; &lt;SPAN&gt;if&lt;/SPAN&gt; &lt;SPAN&gt;duration&lt;/SPAN&gt; &lt;SPAN&gt;&amp;gt;&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt; &lt;SPAN&gt;else&lt;/SPAN&gt; &lt;SPAN&gt;0&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Log summary&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"""&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Batch &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;batch_id&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt; completed at &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;timestamp&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;- Files processed: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;file_count&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;- Success: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;success_count&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;- Errors: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;error_count&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;- Duration: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;duration&lt;/SPAN&gt;&lt;SPAN&gt;:.2f}&lt;/SPAN&gt;&lt;SPAN&gt; seconds&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;- Performance: &lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;files_per_second&lt;/SPAN&gt;&lt;SPAN&gt;:.2f}&lt;/SPAN&gt;&lt;SPAN&gt; files/second&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;"""&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Unpersist to free memory&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;result_df&lt;/SPAN&gt;&lt;SPAN&gt;.unpersist()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Execute the streaming job&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;file_stream&lt;/SPAN&gt;&lt;SPAN&gt;.writeStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.foreachBatch(&lt;/SPAN&gt;&lt;SPAN&gt;process_batch&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.option(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;checkpoint_path&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.trigger(&lt;/SPAN&gt;&lt;SPAN&gt;availableNow&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;# Process available files and terminate&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.start()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;.awaitTermination())&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 08 May 2025 20:49:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118586#M45659</guid>
      <dc:creator>lingareddy_Alva</dc:creator>
      <dc:date>2025-05-08T20:49:44Z</dc:date>
    </item>
    <item>
      <title>Re: Auto Loader for copying files on s3</title>
      <link>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118856#M45728</link>
      <description>&lt;P&gt;Hey LRALVA&lt;BR /&gt;The first time running your code I got the error:&amp;nbsp;&lt;SPAN&gt;&amp;nbsp;&lt;EM&gt;PicklingError: Could not serialize object: Exception: You cannot use dbutils within a spark job You cannot use dbutils within a spark job or otherwise pickle it.&lt;/EM&gt;&lt;BR /&gt;&lt;BR /&gt;So I changed the copy_single_file function to the one below and now it works exactly as expected. Thanks for putting me on the right track!&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;from pyspark.sql.functions import col, lit, input_file_name, regexp_replace, regexp_extract, concat
from pyspark.sql.types import BooleanType, StringType
from pyspark import SparkFiles
import os
import time
import boto3

source_path = ""
destination_path = ""
checkpoint_path = ""
schema_path = ""


spark.conf.set("spark.sql.files.maxPartitionBytes", "128m") # Smaller partitions for more parallelism
spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable adaptive query execution
spark.conf.set("spark.default.parallelism", 100) # Adjust based on your cluster size
spark.conf.set("spark.sql.shuffle.partitions", 100) # Adjust based on your cluster size
spark.conf.set("spark.databricks.io.cache.enabled", "true") # Enable IO cache if on Databricks


def copy_single_file(src_path, dest_path):
    try:
        s3 = boto3.client('s3')
        
        # Strip 's3://' and split into bucket and key
        def split_s3_path(s3_path):
            s3_path = s3_path.replace("s3://", "")
            bucket = s3_path.split("/")[0]
            key = "/".join(s3_path.split("/")[1:])
            return bucket, key

        source_bucket, source_key = split_s3_path(src_path)
        dest_bucket, dest_key = split_s3_path(dest_path)

        s3.copy_object(
            CopySource={'Bucket': source_bucket, 'Key': source_key},
            Bucket=dest_bucket,
            Key=dest_key
        )

        return "success"
    except Exception as e:
        return f"error: {str(e)}"

copy_file_udf = udf(copy_single_file, StringType())

file_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile") # Read files in binary format
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "true")
.option("recursiveFileLookup", "true") # Search all subdirectories
.option("pathGlobFilter", "*") # Process all file types
.option("cloudFiles.useNotifications", "true") # Use S3 notifications if available
.option("cloudFiles.fetchParallelism", 64) # Increase parallelism for listing files
.option("cloudFiles.maxFilesPerTrigger", 10000) # Process more files per batch
.option("cloudFiles.region", "eu-central-1")
.load(source_path)
.select("path", "length", "modificationTime")) # Only select needed columns to reduce memory

def process_batch(batch_df, batch_id):
    start_time = time.time()
    if batch_df.count() == 0:
        print(f"Batch {batch_id}: No files to process")
        return
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    processed_df = (batch_df
    # Create destination path by replacing source path with destination path
    .withColumn("relative_path",
    regexp_replace("path", source_path, ""))
    .withColumn("destination_path",
    concat(lit(destination_path), col("relative_path")))
    # Apply the copy operation to each file in parallel
    .withColumn("copy_result",
    copy_file_udf(col("path"), col("destination_path")))
    )
    file_count = batch_df.count()
    optimal_partitions = min(max(file_count // 1000, 8), 128) # Between 8-128 partitions
    result_df = processed_df.repartition(optimal_partitions).cache()
    success_count = result_df.filter(col("copy_result").startswith("success")).count()
    error_count = result_df.filter(col("copy_result").startswith("error")).count()
    if error_count &amp;gt; 0:
        errors_df = result_df.filter(col("copy_result").startswith("error"))
        print(f"Batch {batch_id}: Found {error_count} errors. Sample errors:")
        errors_df.select("path", "copy_result").show(10, truncate=False)
    duration = time.time() - start_time
    files_per_second = file_count / duration if duration &amp;gt; 0 else 0

    # Log summary
    print(f"""
    Batch {batch_id} completed at {timestamp}:
    - Files processed: {file_count}
    - Success: {success_count}
    - Errors: {error_count}
    - Duration: {duration:.2f} seconds
    - Performance: {files_per_second:.2f} files/second
    """)
    result_df.unpersist()

(file_stream.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True) # Process available files and terminate
.start()
.awaitTermination())&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 12 May 2025 08:29:39 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118856#M45728</guid>
      <dc:creator>daan_dw</dc:creator>
      <dc:date>2025-05-12T08:29:39Z</dc:date>
    </item>
    <item>
      <title>Re: Auto Loader for copying files on s3</title>
      <link>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118916#M45742</link>
      <description>&lt;P&gt;Hey &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/149877"&gt;@daan_dw&lt;/a&gt;&amp;nbsp;Thanks for the update.&lt;/P&gt;</description>
      <pubDate>Mon, 12 May 2025 14:33:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/auto-loader-for-copying-files-on-s3/m-p/118916#M45742</guid>
      <dc:creator>lingareddy_Alva</dc:creator>
      <dc:date>2025-05-12T14:33:03Z</dc:date>
    </item>
  </channel>
</rss>

