dkushari
Databricks Employee
Databricks Employee

Hi @JuliandaCruz - Instead of using Python, I suggest using Spark from DBConnect to achieve the same result. Code given below.

Once you save the Python file (I saved it as list_and_upload_files.py), use a .env file to put the variables.

.env file content -

DATABRICKS_PROFILE=<<...>>
VOLUMES_FOLDER=/Volumes/catalog/schema/volume/folder/
UPLOAD_DESTINATION=/Volumes/catalog/schema/volume/uploads/

 

Finally, use the command below to run it -

python list_and_upload_files.py --action upload --files 1.pdf
from databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv
import os
import shutil
import argparse
import io


class DatabricksFileManager:
    """
    A class to manage file operations in Databricks Volumes.
    """
    
    def __init__(self, profile_name):
        """
        Initialize the DatabricksFileManager with a profile.
        
        Args:
            profile_name: Name of the Databricks profile to use
        """
        self.profile_name = profile_name
        self.spark = None
        self.workspace_client = None
        
    def connect(self):
        """
        Establish connection to Databricks using the configured profile.
        """
        print(f"Connecting to Databricks using profile: {self.profile_name}")
        self.spark = DatabricksSession.builder.profile("fielddemo").serverless().getOrCreate()
        self.workspace_client = WorkspaceClient(profile=self.profile_name)
        print("Connection established successfully!")
        
    def disconnect(self):
        """
        Close the Databricks connection.
        """
        if self.spark:
            self.spark.stop()
            print("Databricks connection closed.")
            
    def list_files(self, folder_path, file_extension=None):
        """
        List files in a Databricks Volumes folder.
        
        Args:
            folder_path: Path to the Volumes folder
            file_extension: Optional file extension to filter (e.g., '.pdf', '.csv')
            
        Returns:
            List of file paths
        """
        if not self.spark:
            raise Exception("Not connected to Databricks. Call connect() first.")
            
        # Use Spark SQL to list files in Databricks Volumes
        files_df = self.spark.sql(f"LIST '{folder_path}'")
        
        # Filter by file extension if provided
        if file_extension:
            if not file_extension.startswith('.'):
                file_extension = '.' + file_extension
            files_df = files_df.filter(files_df.name.endswith(file_extension))
        
        # Show the results
        print(f"Files found in {folder_path}:")
        files_df.select("path", "name", "size").show(truncate=False)
        
        # Get the paths as a list
        file_paths = [row.path for row in files_df.collect()]
        print(f"\nTotal files: {len(file_paths)}")
        
        return file_paths
    
    def list_pdf_files(self, folder_path):
        """
        List all PDF files in a Databricks Volumes folder.
        
        Args:
            folder_path: Path to the Volumes folder
            
        Returns:
            List of PDF file paths
        """
        return self.list_files(folder_path, file_extension='.pdf')
    
    def upload_file(self, local_file_path, destination_path):
        """
        Upload a single file to Databricks Volumes.
        
        Args:
            local_file_path: Path to the local file to upload
            destination_path: Destination path in Databricks Volumes (including filename)
            
        Returns:
            bool: True if upload successful, False otherwise
        """
        if not self.workspace_client:
            raise Exception("Not connected to Databricks. Call connect() first.")
        
        if not os.path.exists(local_file_path):
            print(f"Error: Local file not found: {local_file_path}")
            return False
        
        try:
            # Get file size for progress tracking
            file_size = os.path.getsize(local_file_path)
            file_size_mb = file_size / (1024 * 1024)
            
            # Upload using Databricks SDK Files API
            with open(local_file_path, 'rb') as f:
                file_content = f.read()
                # Wrap bytes in BytesIO to provide file-like interface
                file_obj = io.BytesIO(file_content)
                self.workspace_client.files.upload(
                    destination_path,
                    file_obj,
                    overwrite=True
                )
            
            print(f"āœ“ Successfully uploaded: {os.path.basename(local_file_path)} ({file_size_mb:.2f} MB)")
            return True
            
        except Exception as e:
            print(f"āœ— Error uploading file: {str(e)}")
            return False
    
    def upload_files(self, local_files, destination_folder):
        """
        Upload multiple files to Databricks Volumes.
        
        Args:
            local_files: List of local file paths or a directory path
            destination_folder: Destination folder path in Databricks Volumes
            
        Returns:
            dict: Dictionary with 'success' and 'failed' lists of file paths
        """
        if not self.workspace_client:
            raise Exception("Not connected to Databricks. Call connect() first.")
        
        # Ensure destination folder ends with /
        if not destination_folder.endswith('/'):
            destination_folder += '/'
        
        # Handle if local_files is a directory
        if isinstance(local_files, str) and os.path.isdir(local_files):
            directory = local_files
            local_files = [
                os.path.join(directory, f) 
                for f in os.listdir(directory) 
                if os.path.isfile(os.path.join(directory, f))
            ]
        
        results = {'success': [], 'failed': []}
        
        print(f"\nšŸ“¤ Uploading {len(local_files)} file(s) to {destination_folder}")
        print("-" * 60)
        
        for idx, local_file in enumerate(local_files, 1):
            filename = os.path.basename(local_file)
            destination_path = f"{destination_folder}{filename}"
            
            print(f"[{idx}/{len(local_files)}] {filename}...", end=" ")
            
            if self.upload_file(local_file, destination_path):
                results['success'].append(local_file)
            else:
                results['failed'].append(local_file)
        
        print("-" * 60)
        print(f"āœ… Upload complete: {len(results['success'])} succeeded, {len(results['failed'])} failed")
        return results
    
    def __enter__(self):
        """Context manager entry."""
        self.connect()
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.disconnect()


def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(
        description="Databricks File Manager - List and upload files to Databricks Volumes",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # List PDF files
  python list_files.py --action list
  
  # List files with specific extension
  python list_files.py --action list --extension .csv
  
  # Upload a single file
  python list_files.py --action upload --files /path/to/file.pdf
  
  # Upload multiple files
  python list_files.py --action upload --files file1.pdf file2.pdf file3.pdf
  
  # Upload entire directory
  python list_files.py --action upload --files /path/to/directory
        """
    )
    
    parser.add_argument(
        '--action',
        choices=['list', 'upload'],
        default='list',
        help='Action to perform: list or upload files (default: list)'
    )
    
    parser.add_argument(
        '--files',
        nargs='+',
        help='File(s) or directory to upload (required for upload action)'
    )
    
    parser.add_argument(
        '--extension',
        default='.pdf',
        help='File extension to filter when listing (default: .pdf)'
    )
    
    parser.add_argument(
        '--destination',
        help='Override destination folder from .env (optional)'
    )
    
    args = parser.parse_args()
    
    # Load environment variables from .env file
    load_dotenv()
    
    # Get configuration from environment variables
    profile_name = os.getenv("DATABRICKS_PROFILE")
    folder = os.getenv("VOLUMES_FOLDER")
    upload_destination = args.destination or os.getenv("UPLOAD_DESTINATION")
    
    if not profile_name:
        print("Error: DATABRICKS_PROFILE not set in .env file")
        return
    
    print(f"Using profile: {profile_name}")
    print(f"Folder path: {folder}\n")
    
    # Use the class with context manager (automatically handles connect/disconnect)
    with DatabricksFileManager(profile_name=profile_name) as file_manager:
        
        if args.action == 'list':
            # ===== List files =====
            print("=" * 60)
            print(f"LISTING FILES (*{args.extension})")
            print("=" * 60)
            
            if args.extension == '.pdf':
                files = file_manager.list_pdf_files(folder)
            else:
                files = file_manager.list_files(folder, file_extension=args.extension)
            
            # Print the file paths
            print(f"\nFound {len(files)} file(s):")
            for path in files:
                print(f"  - {path}")
        
        elif args.action == 'upload':
            # ===== Upload files =====
            if not args.files:
                print("Error: --files argument is required for upload action")
                print("Usage: python list_files.py --action upload --files <file1> [file2 ...]")
                return
            
            if not upload_destination:
                print("Error: Upload destination not set. Use --destination or set UPLOAD_DESTINATION in .env")
                return
            
            print("=" * 60)
            print("UPLOADING FILES")
            print("=" * 60)
            
            # Check if it's a single directory or multiple files
            if len(args.files) == 1 and os.path.isdir(args.files[0]):
                # Upload directory
                results = file_manager.upload_files(args.files[0], upload_destination)
            else:
                # Upload specific files
                results = file_manager.upload_files(args.files, upload_destination)
            
            # Summary
            print(f"\nšŸ“Š Summary:")
            print(f"   āœ… Succeeded: {len(results['success'])} file(s)")
            print(f"   āŒ Failed: {len(results['failed'])} file(s)")
            
            if results['failed']:
                print("\nāŒ Failed files:")
                for failed_file in results['failed']:
                    print(f"   - {failed_file}")


if __name__ == "__main__":
    main()