Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-23-2025 05:26 PM
Hi @JuliandaCruz - Instead of using Python, I suggest using Spark from DBConnect to achieve the same result. Code given below.
Once you save the Python file (I saved it as list_and_upload_files.py), use a .env file to put the variables.
.env file content -
DATABRICKS_PROFILE=<<...>>
VOLUMES_FOLDER=/Volumes/catalog/schema/volume/folder/
UPLOAD_DESTINATION=/Volumes/catalog/schema/volume/uploads/
Finally, use the command below to run it -
python list_and_upload_files.py --action upload --files 1.pdffrom databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv
import os
import shutil
import argparse
import io
class DatabricksFileManager:
"""
A class to manage file operations in Databricks Volumes.
"""
def __init__(self, profile_name):
"""
Initialize the DatabricksFileManager with a profile.
Args:
profile_name: Name of the Databricks profile to use
"""
self.profile_name = profile_name
self.spark = None
self.workspace_client = None
def connect(self):
"""
Establish connection to Databricks using the configured profile.
"""
print(f"Connecting to Databricks using profile: {self.profile_name}")
self.spark = DatabricksSession.builder.profile("fielddemo").serverless().getOrCreate()
self.workspace_client = WorkspaceClient(profile=self.profile_name)
print("Connection established successfully!")
def disconnect(self):
"""
Close the Databricks connection.
"""
if self.spark:
self.spark.stop()
print("Databricks connection closed.")
def list_files(self, folder_path, file_extension=None):
"""
List files in a Databricks Volumes folder.
Args:
folder_path: Path to the Volumes folder
file_extension: Optional file extension to filter (e.g., '.pdf', '.csv')
Returns:
List of file paths
"""
if not self.spark:
raise Exception("Not connected to Databricks. Call connect() first.")
# Use Spark SQL to list files in Databricks Volumes
files_df = self.spark.sql(f"LIST '{folder_path}'")
# Filter by file extension if provided
if file_extension:
if not file_extension.startswith('.'):
file_extension = '.' + file_extension
files_df = files_df.filter(files_df.name.endswith(file_extension))
# Show the results
print(f"Files found in {folder_path}:")
files_df.select("path", "name", "size").show(truncate=False)
# Get the paths as a list
file_paths = [row.path for row in files_df.collect()]
print(f"\nTotal files: {len(file_paths)}")
return file_paths
def list_pdf_files(self, folder_path):
"""
List all PDF files in a Databricks Volumes folder.
Args:
folder_path: Path to the Volumes folder
Returns:
List of PDF file paths
"""
return self.list_files(folder_path, file_extension='.pdf')
def upload_file(self, local_file_path, destination_path):
"""
Upload a single file to Databricks Volumes.
Args:
local_file_path: Path to the local file to upload
destination_path: Destination path in Databricks Volumes (including filename)
Returns:
bool: True if upload successful, False otherwise
"""
if not self.workspace_client:
raise Exception("Not connected to Databricks. Call connect() first.")
if not os.path.exists(local_file_path):
print(f"Error: Local file not found: {local_file_path}")
return False
try:
# Get file size for progress tracking
file_size = os.path.getsize(local_file_path)
file_size_mb = file_size / (1024 * 1024)
# Upload using Databricks SDK Files API
with open(local_file_path, 'rb') as f:
file_content = f.read()
# Wrap bytes in BytesIO to provide file-like interface
file_obj = io.BytesIO(file_content)
self.workspace_client.files.upload(
destination_path,
file_obj,
overwrite=True
)
print(f"ā Successfully uploaded: {os.path.basename(local_file_path)} ({file_size_mb:.2f} MB)")
return True
except Exception as e:
print(f"ā Error uploading file: {str(e)}")
return False
def upload_files(self, local_files, destination_folder):
"""
Upload multiple files to Databricks Volumes.
Args:
local_files: List of local file paths or a directory path
destination_folder: Destination folder path in Databricks Volumes
Returns:
dict: Dictionary with 'success' and 'failed' lists of file paths
"""
if not self.workspace_client:
raise Exception("Not connected to Databricks. Call connect() first.")
# Ensure destination folder ends with /
if not destination_folder.endswith('/'):
destination_folder += '/'
# Handle if local_files is a directory
if isinstance(local_files, str) and os.path.isdir(local_files):
directory = local_files
local_files = [
os.path.join(directory, f)
for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
]
results = {'success': [], 'failed': []}
print(f"\nš¤ Uploading {len(local_files)} file(s) to {destination_folder}")
print("-" * 60)
for idx, local_file in enumerate(local_files, 1):
filename = os.path.basename(local_file)
destination_path = f"{destination_folder}{filename}"
print(f"[{idx}/{len(local_files)}] {filename}...", end=" ")
if self.upload_file(local_file, destination_path):
results['success'].append(local_file)
else:
results['failed'].append(local_file)
print("-" * 60)
print(f"ā
Upload complete: {len(results['success'])} succeeded, {len(results['failed'])} failed")
return results
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="Databricks File Manager - List and upload files to Databricks Volumes",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# List PDF files
python list_files.py --action list
# List files with specific extension
python list_files.py --action list --extension .csv
# Upload a single file
python list_files.py --action upload --files /path/to/file.pdf
# Upload multiple files
python list_files.py --action upload --files file1.pdf file2.pdf file3.pdf
# Upload entire directory
python list_files.py --action upload --files /path/to/directory
"""
)
parser.add_argument(
'--action',
choices=['list', 'upload'],
default='list',
help='Action to perform: list or upload files (default: list)'
)
parser.add_argument(
'--files',
nargs='+',
help='File(s) or directory to upload (required for upload action)'
)
parser.add_argument(
'--extension',
default='.pdf',
help='File extension to filter when listing (default: .pdf)'
)
parser.add_argument(
'--destination',
help='Override destination folder from .env (optional)'
)
args = parser.parse_args()
# Load environment variables from .env file
load_dotenv()
# Get configuration from environment variables
profile_name = os.getenv("DATABRICKS_PROFILE")
folder = os.getenv("VOLUMES_FOLDER")
upload_destination = args.destination or os.getenv("UPLOAD_DESTINATION")
if not profile_name:
print("Error: DATABRICKS_PROFILE not set in .env file")
return
print(f"Using profile: {profile_name}")
print(f"Folder path: {folder}\n")
# Use the class with context manager (automatically handles connect/disconnect)
with DatabricksFileManager(profile_name=profile_name) as file_manager:
if args.action == 'list':
# ===== List files =====
print("=" * 60)
print(f"LISTING FILES (*{args.extension})")
print("=" * 60)
if args.extension == '.pdf':
files = file_manager.list_pdf_files(folder)
else:
files = file_manager.list_files(folder, file_extension=args.extension)
# Print the file paths
print(f"\nFound {len(files)} file(s):")
for path in files:
print(f" - {path}")
elif args.action == 'upload':
# ===== Upload files =====
if not args.files:
print("Error: --files argument is required for upload action")
print("Usage: python list_files.py --action upload --files <file1> [file2 ...]")
return
if not upload_destination:
print("Error: Upload destination not set. Use --destination or set UPLOAD_DESTINATION in .env")
return
print("=" * 60)
print("UPLOADING FILES")
print("=" * 60)
# Check if it's a single directory or multiple files
if len(args.files) == 1 and os.path.isdir(args.files[0]):
# Upload directory
results = file_manager.upload_files(args.files[0], upload_destination)
else:
# Upload specific files
results = file_manager.upload_files(args.files, upload_destination)
# Summary
print(f"\nš Summary:")
print(f" ā
Succeeded: {len(results['success'])} file(s)")
print(f" ā Failed: {len(results['failed'])} file(s)")
if results['failed']:
print("\nā Failed files:")
for failed_file in results['failed']:
print(f" - {failed_file}")
if __name__ == "__main__":
main()