cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

copy file structure including files from one storage to another incrementally using pyspark

shreya_20202
New Contributor II

I have a storage account dexflex and two containers source and destination. Source container has directory and files as below:

results  
    search
        03
            Module19111.json
            Module19126.json
        04
            Module11291.json
            Module19222.json
    product
        03
            Module18867.json
            Module182625.json
        04
            Module122251.json
            Module192287.json

i am trying to copy the data incrementally from source to destination container by using the below code snippet

from datetime import datetime, timedelta
from pyspark.sql import SparkSession


# Set up the source and destination storage account configurations
source_account_name = "dev-stor"
source_container_name = "results"
destination_account_name = "dev-stor"
destination_container_name = "results"

# Set up the source and destination paths
source_path = f"abfss://{source_container_name}@{source_account_name}.dfs.core.windows.net/{search,product}/"
destination_path = f"abfss://{destination_container_name}@{destination_account_name}.dfs.core.windows.net/copy-data-2024"

# Set up the date range for incremental copy
start_date = datetime(2024, 3, 1)
end_date = datetime(2999, 12, 12)

dbutils.fs.cp(source_path, destination_path, recurse=True)

the above code is a full copy however i am more of looking towards incremental copy i.e in the next run only the new files be copied.

PS. directory hierarchy is to be the same.

I also tried autoloader but was unable to main the same hierarchical directory structure.

can i get some expert advice please

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @shreya_20202, It looks like you’re trying to incrementally copy data from the source container to the destination container in Azure Databricks. To achieve this, you’ll need to compare the files in the source and destination directories and copy only the new or modified files.

Here’s an approach you can follow for incremental copying:

  1. List Files in Source and Destination: First, list the files in both the source and destination directories. You can use the dbutils.fs.ls() function to retrieve the file paths.

  2. Compare Timestamps: Compare the timestamps (last modified time) of files in the source and destination directories. Identify files that are newer in the source than in the destination.

  3. Copy New Files: Copy the new files from the source to the destination. You can use the dbutils.fs.cp() function to copy individual files.

  4. Maintain Directory Hierarchy: To maintain the same hierarchical directory structure, create the corresponding directories in the destination if they don’t exist.

Below, I’ve modified your code snippet to perform an incremental copy based on timestamps. Note that this approach assumes that the files in the source directory are uniquely named (i.e., no duplicate filenames across directories).

from datetime import datetime
from pyspark.sql import SparkSession

# Set up the source and destination storage account configurations
source_account_name = "dev-stor"
source_container_name = "results"
destination_account_name = "dev-stor"
destination_container_name = "results"

# Set up the source and destination paths
source_base_path = f"abfss://{source_container_name}@{source_account_name}.dfs.core.windows.net/"
destination_base_path = f"abfss://{destination_container_name}@{destination_account_name}.dfs.core.windows.net/copy-data-2024"

# List files in the source directory
source_files = dbutils.fs.ls(f"{source_base_path}/{source_container_name}")

# Compare timestamps and copy new files
for file_info in source_files:
    source_file_path = file_info.path
    destination_file_path = source_file_path.replace(source_base_path, destination_base_path)
    
    # Check if the file exists in the destination
    if not dbutils.fs.cp(source_file_path, destination_file_path, recurse=True):
        print(f"Copying new file: {source_file_path} -> {destination_file_path}")

print("Incremental copy completed successfully!")

Remember to replace the placeholders with your actual storage account names and container names. Additionally, ensure that you have the necessary permissions to read from the source and write to the destination.

Feel free to adjust the code snippet according to your specific requirements. If you encounter any issues or need further assistance, feel free to ask! 😊