โ04-09-2023 05:10 AM
Hi All
Can you please advise how I can arrange loaded file deletion from Azure Storage upon its successful load via Autoloader? As I understood, Spark streaming "cleanSource" option is unavailable for Autoloader, so I'm trying to find the best way to achieve landing zone cleansing.
โ04-14-2023 10:00 AM
@Artem Sachukโ :
Yes, to keep track of which files were loaded by Autoloader, you can add the input_file_name() function as a new column in your data frame during the loading process. This way, you can know exactly which file each row of data came from.
Alternatively, you can also use the Autoloader's checkpoint mechanism to keep track of which files have been successfully loaded. The checkpoint mechanism stores the file metadata (such as file name, size, and modified time) in a checkpoint file. You can query this checkpoint file to see which files have been processed and loaded successfully.
To enable checkpointing, you need to specify a checkpoint directory when you create the Autoloader stream. For example:
checkpoint_dir = "/mnt/checkpoints"
autoloader = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.checkpointLocation", checkpoint_dir) \
.load(path)
The checkpoint directory can be any path in a file system that supports write access. Make sure that the checkpoint directory is on a durable file system (such as HDFS or Azure Blob Storage) and not on a local disk, as the checkpoint information needs to survive driver or executor failures.
Once the checkpoint directory is set, the Autoloader stream will create checkpoint files in that directory to keep track of its progress. You can query the checkpoint files using the StreamingQuery.recentProgress method to get information about the latest batch of processed files.
For Example:
query = autoloader.writeStream \
.outputMode("append") \
.format("console") \
.start()
while not query.isActive:
time.sleep(1)
while query.isActive:
for progress in query.recentProgress:
if "numInputRows" in progress:
print("Loaded {} rows from file {}"
.format(progress["numInputRows"], progress["inputFiles"]))
time.sleep(1)
query.awaitTermination()
This code will print the number of rows loaded and the file names for each processed batch of files.
โ04-10-2023 06:00 AM
@Artem Sachukโ :
One way to achieve landing zone cleansing is to use the Azure Storage SDK in a script or job after the successful load of the file via Autoloader.
First, you can use the Databricks dbutils.fs.ls() command to get the list of files in the landing zone directory. Then, using the Azure Storage SDK, you can delete the files that have already been loaded.
Here is an example Python code snippet that demonstrates this approach:
import os
from azure.storage.blob import BlobServiceClient
# Set your Azure Storage account details
account_name = "your_account_name"
account_key = "your_account_key"
# Set your landing zone directory path
landing_zone_path = "/mnt/landing_zone"
# Create a BlobServiceClient object to connect to your storage account
connect_str = "DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix=core.windows.net".format(account_name, account_key)
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Get the list of files in the landing zone directory
landing_zone_files = dbutils.fs.ls(landing_zone_path)
# Loop through each file in the landing zone
for file in landing_zone_files:
file_path = file.path
file_name = os.path.basename(file_path)
# Check if the file has already been loaded (you may need to modify this logic based on your specific use case)
if file_name.startswith("loaded_"):
# Delete the file from Azure Storage
container_name = "your_container_name"
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)
blob_client.delete_blob()
# Delete the file from the landing zone directory
dbutils.fs.rm(file_path)
Note that in this example, the landing zone files that have already been loaded are assumed to have a prefix of "loaded_". You may need to modify the logic to suit your specific use case. Also, make sure to replace the account_name, account_key, landing_zone_path, and container_name variables with your own values.
โ04-11-2023 07:12 AM
Thank you very much, @Suteja Kanuriโ.
I guess the only thing I would need is to understand which files were loaded, right?
Can I somehow query autoloader checkpoints, or I need to keep track of files I loaded with something like input_file_name() being added to my data?
โ04-14-2023 10:00 AM
@Artem Sachukโ :
Yes, to keep track of which files were loaded by Autoloader, you can add the input_file_name() function as a new column in your data frame during the loading process. This way, you can know exactly which file each row of data came from.
Alternatively, you can also use the Autoloader's checkpoint mechanism to keep track of which files have been successfully loaded. The checkpoint mechanism stores the file metadata (such as file name, size, and modified time) in a checkpoint file. You can query this checkpoint file to see which files have been processed and loaded successfully.
To enable checkpointing, you need to specify a checkpoint directory when you create the Autoloader stream. For example:
checkpoint_dir = "/mnt/checkpoints"
autoloader = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.checkpointLocation", checkpoint_dir) \
.load(path)
The checkpoint directory can be any path in a file system that supports write access. Make sure that the checkpoint directory is on a durable file system (such as HDFS or Azure Blob Storage) and not on a local disk, as the checkpoint information needs to survive driver or executor failures.
Once the checkpoint directory is set, the Autoloader stream will create checkpoint files in that directory to keep track of its progress. You can query the checkpoint files using the StreamingQuery.recentProgress method to get information about the latest batch of processed files.
For Example:
query = autoloader.writeStream \
.outputMode("append") \
.format("console") \
.start()
while not query.isActive:
time.sleep(1)
while query.isActive:
for progress in query.recentProgress:
if "numInputRows" in progress:
print("Loaded {} rows from file {}"
.format(progress["numInputRows"], progress["inputFiles"]))
time.sleep(1)
query.awaitTermination()
This code will print the number of rows loaded and the file names for each processed batch of files.
โ04-14-2023 11:56 AM
@Suteja Kanuriโ Thank you!
โ04-11-2023 06:34 AM
Hi @Artem Sachukโ
Thank you for posting your question in our community! We are happy to assist you.
To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?
This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group