cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Able to read .hdf files but not able to write to .hdf files from worker nodes and save to dbfs

draculla1208
New Contributor

I have a set of .hdf files that I want to distribute and read on Worker nodes under Databricks environment using PySpark. I am able to read .hdf files on worker nodes and get the data from the files.

The next requirement is that now each worker node has to modify the respective .hdf file and the final updated file should be stored to dbfs location. However, I am not able to modify the contents of the .hdf files. The .hdf files are read and written through h5py. The contents of the .hdf files are saved on worker, but when I copy the files from worker to dbfs, changes are no more visible. Any suggestion would be highly appreciated.

Here is the short code

def change_to_hdf2(dname, file_name, data):
    import numpy as np
    import h5py, os, subprocess
    with h5py.File(file_name, 'a') as f:
        dset = f.create_dataset(dname, data = data) 
    p = subprocess.Popen("cp {0} {1}".format(os.path.join(os.getcwd(), file_name), os.path.join('/dbfs/mnt', file_name)), stdout=subprocess.PIPE, shell=True) #copy from worker to dbfs
    return 'Copied back'
    
def create_hdf_file2(file):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd
    
    p = subprocess.Popen("cp {0} {1}".format(file, os.getcwd()), stdout=subprocess.PIPE, shell=True) #copy hdf files from dbfs to worker's directory
 
    dbfs_base_path, dbfs_fname = os.path.split(file) #separate file name
    *****_data_updated = [111,222,333,444,555]
    df_data2_updated = pd.DataFrame(*****_data_updated, columns=['Numbers'])
    change_to_hdf2('test_dataset_44', dbfs_fname, df_data2_updated)
    return True
 
def read_hdf_file2(file_name, dname):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd
    with h5py.File(file_name, 'r') as f:
        data = f[dname]
        print(data[:5])    
    return data
 
#main code
file_name = ['/dbfs/mnt/file1.hdf', '/dbfs/mnt/file2.hdf']
rdd = spark.sparkContext.parallelize(file_name)
result = rdd.map(lambda x: create_hdf_file(x)).collect()
print(result)
 
#----------------to read files from the dbfs/mnt after copying files from worker
#Test code
read_hdf_file('/dbfs/mnt/file1.hdf', dname = 'default') # works as 'default' existed in original file
read_hdf_file('/dbfs/mnt/file1.hdf', dname = 'test_dataset_44') # failed,
 
#in above line, 'test_dataset_44' is not being found on dbfs/mnt/file1.hdf

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.