cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

FileNotFoundError when using sftp to write to disk within jobs

akdm
Contributor

When I try to convert a notebook into a job I frequently run into an issue with writing to the local filesystem. For this particular example, I did all my notebook testing with a bytestream for small files. When I tried to run as a job, I used the method I had to save the download to disk but I keep getting a `FileNotFoundError`. An example code snippet is below with two methods I've tried:

# method 1
def sftp_read(sftp_object, prefix):
    key = f'{prefix}/{sftp_object}'
    if not os.path.exists('/local_disk0/tmp/'):
        os.makedirs('/local_disk0/tmp/')
    sftp.get(sftp_object, f'/local_disk0/tmp/{sftp_object}')
    # do stuff
    os.remove(f'/local_disk0/tmp/{sftp_object}')
 
# method 2
def sftp_read(sftp_object, prefix):
    key = f'{prefix}/{sftp_object}'
    dbutils.fs.mkdirs('file:/tmp/')
    sftp.get(sftp_object, f'file:/tmp/{sftp_object}')
    # do stuff
    dbutils.fs.rm(f'file:/tmp/{sftp_object}')
        
 
FileNotFoundError                         Traceback (most recent call last)
<command-3394785040378964> in <cell line: 1>()
      3     if dryrun:
      4         print(sftp_object)
----> 5     sftp_read(sftp_object, prefix)
 
<command-3394785040378909> in sftp_read(sftp_object, prefix)
     57             if not os.path.exists('/local_disk0/tmp/'):
     58                 os.makedirs('/local_disk0/tmp/')
---> 59             sftp.get(sftp_object, f'/local_disk0/tmp/{sftp_object}')
     60             # do stuff
     61             os.remove(f'/local_disk0/tmp/{sftp_object}')
 
/local_disk0/.ephemeral_nfs/envs/pythonEnv-1e9ce7e1-d7d5-4473-b8d6-dbe59be12302/lib/python3.9/site-packages/paramiko/sftp_client.py in get(self, remotepath, localpath, callback, prefetch)
    808             Added the ``prefetch`` keyword argument.
    809         """
--> 810         with open(localpath, "wb") as fl:
    811             size = self.getfo(remotepath, fl, callback, prefetch)
    812         s = os.stat(localpath)
 
FileNotFoundError: [Errno 2] No such file or directory: '/local_disk0/tmp/path/to/file.ext'

I have referenced the DFBS local files documentation as well: https://docs.databricks.com/files/index.html

Any suggestions or something I need to know about Jobs running in a different manner than notebooks?

1 ACCEPTED SOLUTION

Accepted Solutions

akdm
Contributor

I was able to fix it. It was an issue with the nested files on the SFTP. I had to ensure that the parent folders were being created as well. Splitting out the local path and file made it easier to ensure that it existed with os.path.exists() and os.makedirs()

def sftp_read(sftp_object, bucket, prefix):
    key = f'{prefix}/{sftp_object}'
    local_path = '/local_disk0/tmp'
    local_file = f'{local_path}/{os.path.basename(sftp_object)}'
    if not os.path.exists(local_path):
            os.makedirs(local_path)
    sftp.get(sftp_object, local_file)
     # do stuff
     os.remove(local_file)

All in all, not a databricks issue, just an issue that appeared on databricks.

View solution in original post

3 REPLIES 3

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi, Thanks for reaching out to community.databricks.com.

Could you please also mention where did you declare sftp_object? Also, how did you set the connection to SFTP? Was it with password or password less?

SFTP connection is done with a password

def connect_to_sftp(host: str, port: (str,int), username: str, password:str) -> paramiko.sftp_client.SFTPClient:
    stfp, transport = None, None
    try:
        transport = paramiko.Transport(host, port)
        transport.connect(username=username, password=password)
    except Exception as e:
        print(e)
        if transport is not None:
            transport.close()
    try:
        sftp = paramiko.SFTPClient.from_transport(transport)
        return sftp
    except Exception as e:
        print(e)
        if sftp is not None:
            sftp.close()

sftp_object is the name of the remote object:

for file in sftp.listdir(sftp_dir):
    sftp_object = f'{sftp_dir}/{file}'
    if dryrun:
        print(sftp_object)
    sftp_read(sftp_object, prefix)

akdm
Contributor

I was able to fix it. It was an issue with the nested files on the SFTP. I had to ensure that the parent folders were being created as well. Splitting out the local path and file made it easier to ensure that it existed with os.path.exists() and os.makedirs()

def sftp_read(sftp_object, bucket, prefix):
    key = f'{prefix}/{sftp_object}'
    local_path = '/local_disk0/tmp'
    local_file = f'{local_path}/{os.path.basename(sftp_object)}'
    if not os.path.exists(local_path):
            os.makedirs(local_path)
    sftp.get(sftp_object, local_file)
     # do stuff
     os.remove(local_file)

All in all, not a databricks issue, just an issue that appeared on databricks.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.