cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to make Autoloader delete files after a successful load

Arty
New Contributor II

Hi All

Can you please advise how I can arrange loaded file deletion from Azure Storage upon its successful load via Autoloader? As I understood, Spark streaming "cleanSource" option is unavailable for Autoloader, so I'm trying to find the best way to achieve landing zone cleansing.

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Artem Sachukโ€‹ :

Yes, to keep track of which files were loaded by Autoloader, you can add the input_file_name() function as a new column in your data frame during the loading process. This way, you can know exactly which file each row of data came from.

Alternatively, you can also use the Autoloader's checkpoint mechanism to keep track of which files have been successfully loaded. The checkpoint mechanism stores the file metadata (such as file name, size, and modified time) in a checkpoint file. You can query this checkpoint file to see which files have been processed and loaded successfully.

To enable checkpointing, you need to specify a checkpoint directory when you create the Autoloader stream. For example:

checkpoint_dir = "/mnt/checkpoints"
autoloader = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.checkpointLocation", checkpoint_dir) \
    .load(path)

The checkpoint directory can be any path in a file system that supports write access. Make sure that the checkpoint directory is on a durable file system (such as HDFS or Azure Blob Storage) and not on a local disk, as the checkpoint information needs to survive driver or executor failures.

Once the checkpoint directory is set, the Autoloader stream will create checkpoint files in that directory to keep track of its progress. You can query the checkpoint files using the StreamingQuery.recentProgress method to get information about the latest batch of processed files.

For Example:

query = autoloader.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
 
while not query.isActive:
    time.sleep(1)
 
while query.isActive:
    for progress in query.recentProgress:
        if "numInputRows" in progress:
            print("Loaded {} rows from file {}"
                  .format(progress["numInputRows"], progress["inputFiles"]))
    time.sleep(1)
 
query.awaitTermination()

This code will print the number of rows loaded and the file names for each processed batch of files.

View solution in original post

5 REPLIES 5

Anonymous
Not applicable

@Artem Sachukโ€‹ :

One way to achieve landing zone cleansing is to use the Azure Storage SDK in a script or job after the successful load of the file via Autoloader.

First, you can use the Databricks dbutils.fs.ls() command to get the list of files in the landing zone directory. Then, using the Azure Storage SDK, you can delete the files that have already been loaded.

Here is an example Python code snippet that demonstrates this approach:

import os
from azure.storage.blob import BlobServiceClient
 
# Set your Azure Storage account details
account_name = "your_account_name"
account_key = "your_account_key"
 
# Set your landing zone directory path
landing_zone_path = "/mnt/landing_zone"
 
# Create a BlobServiceClient object to connect to your storage account
connect_str = "DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix=core.windows.net".format(account_name, account_key)
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
 
# Get the list of files in the landing zone directory
landing_zone_files = dbutils.fs.ls(landing_zone_path)
 
# Loop through each file in the landing zone
for file in landing_zone_files:
    file_path = file.path
    file_name = os.path.basename(file_path)
    
    # Check if the file has already been loaded (you may need to modify this logic based on your specific use case)
    if file_name.startswith("loaded_"):
        
        # Delete the file from Azure Storage
        container_name = "your_container_name"
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)
        blob_client.delete_blob()
        
        # Delete the file from the landing zone directory
        dbutils.fs.rm(file_path)

Note that in this example, the landing zone files that have already been loaded are assumed to have a prefix of "loaded_". You may need to modify the logic to suit your specific use case. Also, make sure to replace the account_name, account_key, landing_zone_path, and container_name variables with your own values.

Arty
New Contributor II

Thank you very much, @Suteja Kanuriโ€‹. 

I guess the only thing I would need is to understand which files were loaded, right?

Can I somehow query autoloader checkpoints, or I need to keep track of files I loaded with something like input_file_name() being added to my data?

Anonymous
Not applicable

@Artem Sachukโ€‹ :

Yes, to keep track of which files were loaded by Autoloader, you can add the input_file_name() function as a new column in your data frame during the loading process. This way, you can know exactly which file each row of data came from.

Alternatively, you can also use the Autoloader's checkpoint mechanism to keep track of which files have been successfully loaded. The checkpoint mechanism stores the file metadata (such as file name, size, and modified time) in a checkpoint file. You can query this checkpoint file to see which files have been processed and loaded successfully.

To enable checkpointing, you need to specify a checkpoint directory when you create the Autoloader stream. For example:

checkpoint_dir = "/mnt/checkpoints"
autoloader = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.checkpointLocation", checkpoint_dir) \
    .load(path)

The checkpoint directory can be any path in a file system that supports write access. Make sure that the checkpoint directory is on a durable file system (such as HDFS or Azure Blob Storage) and not on a local disk, as the checkpoint information needs to survive driver or executor failures.

Once the checkpoint directory is set, the Autoloader stream will create checkpoint files in that directory to keep track of its progress. You can query the checkpoint files using the StreamingQuery.recentProgress method to get information about the latest batch of processed files.

For Example:

query = autoloader.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
 
while not query.isActive:
    time.sleep(1)
 
while query.isActive:
    for progress in query.recentProgress:
        if "numInputRows" in progress:
            print("Loaded {} rows from file {}"
                  .format(progress["numInputRows"], progress["inputFiles"]))
    time.sleep(1)
 
query.awaitTermination()

This code will print the number of rows loaded and the file names for each processed batch of files.

Arty
New Contributor II

@Suteja Kanuriโ€‹ Thank you!

Anonymous
Not applicable

Hi @Artem Sachukโ€‹ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!