4 weeks ago
Hi all,
4 weeks ago
Hi @JuliandaCruz - Instead of using Python, I suggest using Spark from DBConnect to achieve the same result. Code given below.
Once you save the Python file (I saved it as list_and_upload_files.py), use a .env file to put the variables.
.env file content -
DATABRICKS_PROFILE=<<...>>
VOLUMES_FOLDER=/Volumes/catalog/schema/volume/folder/
UPLOAD_DESTINATION=/Volumes/catalog/schema/volume/uploads/
Finally, use the command below to run it -
python list_and_upload_files.py --action upload --files 1.pdffrom databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv
import os
import shutil
import argparse
import io
class DatabricksFileManager:
"""
A class to manage file operations in Databricks Volumes.
"""
def __init__(self, profile_name):
"""
Initialize the DatabricksFileManager with a profile.
Args:
profile_name: Name of the Databricks profile to use
"""
self.profile_name = profile_name
self.spark = None
self.workspace_client = None
def connect(self):
"""
Establish connection to Databricks using the configured profile.
"""
print(f"Connecting to Databricks using profile: {self.profile_name}")
self.spark = DatabricksSession.builder.profile("fielddemo").serverless().getOrCreate()
self.workspace_client = WorkspaceClient(profile=self.profile_name)
print("Connection established successfully!")
def disconnect(self):
"""
Close the Databricks connection.
"""
if self.spark:
self.spark.stop()
print("Databricks connection closed.")
def list_files(self, folder_path, file_extension=None):
"""
List files in a Databricks Volumes folder.
Args:
folder_path: Path to the Volumes folder
file_extension: Optional file extension to filter (e.g., '.pdf', '.csv')
Returns:
List of file paths
"""
if not self.spark:
raise Exception("Not connected to Databricks. Call connect() first.")
# Use Spark SQL to list files in Databricks Volumes
files_df = self.spark.sql(f"LIST '{folder_path}'")
# Filter by file extension if provided
if file_extension:
if not file_extension.startswith('.'):
file_extension = '.' + file_extension
files_df = files_df.filter(files_df.name.endswith(file_extension))
# Show the results
print(f"Files found in {folder_path}:")
files_df.select("path", "name", "size").show(truncate=False)
# Get the paths as a list
file_paths = [row.path for row in files_df.collect()]
print(f"\nTotal files: {len(file_paths)}")
return file_paths
def list_pdf_files(self, folder_path):
"""
List all PDF files in a Databricks Volumes folder.
Args:
folder_path: Path to the Volumes folder
Returns:
List of PDF file paths
"""
return self.list_files(folder_path, file_extension='.pdf')
def upload_file(self, local_file_path, destination_path):
"""
Upload a single file to Databricks Volumes.
Args:
local_file_path: Path to the local file to upload
destination_path: Destination path in Databricks Volumes (including filename)
Returns:
bool: True if upload successful, False otherwise
"""
if not self.workspace_client:
raise Exception("Not connected to Databricks. Call connect() first.")
if not os.path.exists(local_file_path):
print(f"Error: Local file not found: {local_file_path}")
return False
try:
# Get file size for progress tracking
file_size = os.path.getsize(local_file_path)
file_size_mb = file_size / (1024 * 1024)
# Upload using Databricks SDK Files API
with open(local_file_path, 'rb') as f:
file_content = f.read()
# Wrap bytes in BytesIO to provide file-like interface
file_obj = io.BytesIO(file_content)
self.workspace_client.files.upload(
destination_path,
file_obj,
overwrite=True
)
print(f"✓ Successfully uploaded: {os.path.basename(local_file_path)} ({file_size_mb:.2f} MB)")
return True
except Exception as e:
print(f"✗ Error uploading file: {str(e)}")
return False
def upload_files(self, local_files, destination_folder):
"""
Upload multiple files to Databricks Volumes.
Args:
local_files: List of local file paths or a directory path
destination_folder: Destination folder path in Databricks Volumes
Returns:
dict: Dictionary with 'success' and 'failed' lists of file paths
"""
if not self.workspace_client:
raise Exception("Not connected to Databricks. Call connect() first.")
# Ensure destination folder ends with /
if not destination_folder.endswith('/'):
destination_folder += '/'
# Handle if local_files is a directory
if isinstance(local_files, str) and os.path.isdir(local_files):
directory = local_files
local_files = [
os.path.join(directory, f)
for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
]
results = {'success': [], 'failed': []}
print(f"\n📤 Uploading {len(local_files)} file(s) to {destination_folder}")
print("-" * 60)
for idx, local_file in enumerate(local_files, 1):
filename = os.path.basename(local_file)
destination_path = f"{destination_folder}{filename}"
print(f"[{idx}/{len(local_files)}] {filename}...", end=" ")
if self.upload_file(local_file, destination_path):
results['success'].append(local_file)
else:
results['failed'].append(local_file)
print("-" * 60)
print(f"✅ Upload complete: {len(results['success'])} succeeded, {len(results['failed'])} failed")
return results
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="Databricks File Manager - List and upload files to Databricks Volumes",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# List PDF files
python list_files.py --action list
# List files with specific extension
python list_files.py --action list --extension .csv
# Upload a single file
python list_files.py --action upload --files /path/to/file.pdf
# Upload multiple files
python list_files.py --action upload --files file1.pdf file2.pdf file3.pdf
# Upload entire directory
python list_files.py --action upload --files /path/to/directory
"""
)
parser.add_argument(
'--action',
choices=['list', 'upload'],
default='list',
help='Action to perform: list or upload files (default: list)'
)
parser.add_argument(
'--files',
nargs='+',
help='File(s) or directory to upload (required for upload action)'
)
parser.add_argument(
'--extension',
default='.pdf',
help='File extension to filter when listing (default: .pdf)'
)
parser.add_argument(
'--destination',
help='Override destination folder from .env (optional)'
)
args = parser.parse_args()
# Load environment variables from .env file
load_dotenv()
# Get configuration from environment variables
profile_name = os.getenv("DATABRICKS_PROFILE")
folder = os.getenv("VOLUMES_FOLDER")
upload_destination = args.destination or os.getenv("UPLOAD_DESTINATION")
if not profile_name:
print("Error: DATABRICKS_PROFILE not set in .env file")
return
print(f"Using profile: {profile_name}")
print(f"Folder path: {folder}\n")
# Use the class with context manager (automatically handles connect/disconnect)
with DatabricksFileManager(profile_name=profile_name) as file_manager:
if args.action == 'list':
# ===== List files =====
print("=" * 60)
print(f"LISTING FILES (*{args.extension})")
print("=" * 60)
if args.extension == '.pdf':
files = file_manager.list_pdf_files(folder)
else:
files = file_manager.list_files(folder, file_extension=args.extension)
# Print the file paths
print(f"\nFound {len(files)} file(s):")
for path in files:
print(f" - {path}")
elif args.action == 'upload':
# ===== Upload files =====
if not args.files:
print("Error: --files argument is required for upload action")
print("Usage: python list_files.py --action upload --files <file1> [file2 ...]")
return
if not upload_destination:
print("Error: Upload destination not set. Use --destination or set UPLOAD_DESTINATION in .env")
return
print("=" * 60)
print("UPLOADING FILES")
print("=" * 60)
# Check if it's a single directory or multiple files
if len(args.files) == 1 and os.path.isdir(args.files[0]):
# Upload directory
results = file_manager.upload_files(args.files[0], upload_destination)
else:
# Upload specific files
results = file_manager.upload_files(args.files, upload_destination)
# Summary
print(f"\n📊 Summary:")
print(f" ✅ Succeeded: {len(results['success'])} file(s)")
print(f" ❌ Failed: {len(results['failed'])} file(s)")
if results['failed']:
print("\n❌ Failed files:")
for failed_file in results['failed']:
print(f" - {failed_file}")
if __name__ == "__main__":
main()
4 weeks ago
Hi dkushari,
thanks for the detailed answer, but I don't want to adapt my whole project to new code just because the connector isn't working anymore as intended.
My glob-example is just a short code snippet to describe the general problem.
Our project consists of over 10000 lines of code and when problems occur, I need to use the debugger from VSCode, which worked just fine until start of this week.
Do you have any idea why this isn't working anymore?
4 weeks ago
Thank you for reaching out!
I was able to reproduce your case while using Databricks Connect. The "Upload and Run file" option worked fine and returned results, which is essentially the same as running from the Databricks UI.
Though when running "Run current file with Databricks Connect", the same code returned no values. This option leverages a mix between your local environment and the one in the cluster, hence the importance of matching Python versions; however, I was unable to obtain results using "glob"; I couldn't find a specific reason or limitation.
You can follow the approach suggested by @dkushari , or a faster workaround would be to leverage:
- LIST command in SQL or dbutils.fs.ls in Python, which will return: path, name, size, and modification time
# Define Volume Path
volume_path =’'/Volumes/..."
# Get the Files from specified Volume
files = dbutils.fs.ls(volume_path)
# Parse and extract specific fields
# path, name, size, modification
# Print Results
print(files)
I hope this works!
Thank you,
4 weeks ago
Hi mmayorga, thanks for the answer, but as I describe above, this was working before and I am looking for a specific reason on why this isn't working anymore.
Something must've changed, and since it isn't working for you anymore, too; it probably is something from Databricks side.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now