cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Able to read .hdf files but not able to write to .hdf files from worker nodes and save to dbfs

draculla1208
New Contributor

I have a set of .hdf files that I want to distribute and read on Worker nodes under Databricks environment using PySpark. I am able to read .hdf files on worker nodes and get the data from the files.

The next requirement is that now each worker node has to modify the respective .hdf file and the final updated file should be stored to dbfs location. However, I am not able to modify the contents of the .hdf files. The .hdf files are read and written through h5py. The contents of the .hdf files are saved on worker, but when I copy the files from worker to dbfs, changes are no more visible. Any suggestion would be highly appreciated.

Here is the short code

def change_to_hdf2(dname, file_name, data):
    import numpy as np
    import h5py, os, subprocess
    with h5py.File(file_name, 'a') as f:
        dset = f.create_dataset(dname, data = data) 
    p = subprocess.Popen("cp {0} {1}".format(os.path.join(os.getcwd(), file_name), os.path.join('/dbfs/mnt', file_name)), stdout=subprocess.PIPE, shell=True) #copy from worker to dbfs
    return 'Copied back'
    
def create_hdf_file2(file):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd
    
    p = subprocess.Popen("cp {0} {1}".format(file, os.getcwd()), stdout=subprocess.PIPE, shell=True) #copy hdf files from dbfs to worker's directory
 
    dbfs_base_path, dbfs_fname = os.path.split(file) #separate file name
    *****_data_updated = [111,222,333,444,555]
    df_data2_updated = pd.DataFrame(*****_data_updated, columns=['Numbers'])
    change_to_hdf2('test_dataset_44', dbfs_fname, df_data2_updated)
    return True
 
def read_hdf_file2(file_name, dname):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd
    with h5py.File(file_name, 'r') as f:
        data = f[dname]
        print(data[:5])    
    return data
 
#main code
file_name = ['/dbfs/mnt/file1.hdf', '/dbfs/mnt/file2.hdf']
rdd = spark.sparkContext.parallelize(file_name)
result = rdd.map(lambda x: create_hdf_file(x)).collect()
print(result)
 
#----------------to read files from the dbfs/mnt after copying files from worker
#Test code
read_hdf_file('/dbfs/mnt/file1.hdf', dname = 'default') # works as 'default' existed in original file
read_hdf_file('/dbfs/mnt/file1.hdf', dname = 'test_dataset_44') # failed,
 
#in above line, 'test_dataset_44' is not being found on dbfs/mnt/file1.hdf

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group