I have a set of .hdf files that I want to distribute and read on Worker nodes under Databricks environment using PySpark. I am able to read .hdf files on worker nodes and get the data from the files.
The next requirement is that now each worker node has to modify the respective .hdf file and the final updated file should be stored to dbfs location. However, I am not able to modify the contents of the .hdf files. The .hdf files are read and written through h5py. The contents of the .hdf files are saved on worker, but when I copy the files from worker to dbfs, changes are no more visible. Any suggestion would be highly appreciated.
Here is the short code
def change_to_hdf2(dname, file_name, data):
import numpy as np
import h5py, os, subprocess
with h5py.File(file_name, 'a') as f:
dset = f.create_dataset(dname, data = data)
p = subprocess.Popen("cp {0} {1}".format(os.path.join(os.getcwd(), file_name), os.path.join('/dbfs/mnt', file_name)), stdout=subprocess.PIPE, shell=True) #copy from worker to dbfs
return 'Copied back'
def create_hdf_file2(file):
import numpy as np
import h5py, os, subprocess
import pandas as pd
p = subprocess.Popen("cp {0} {1}".format(file, os.getcwd()), stdout=subprocess.PIPE, shell=True) #copy hdf files from dbfs to worker's directory
dbfs_base_path, dbfs_fname = os.path.split(file) #separate file name
*****_data_updated = [111,222,333,444,555]
df_data2_updated = pd.DataFrame(*****_data_updated, columns=['Numbers'])
change_to_hdf2('test_dataset_44', dbfs_fname, df_data2_updated)
return True
def read_hdf_file2(file_name, dname):
import numpy as np
import h5py, os, subprocess
import pandas as pd
with h5py.File(file_name, 'r') as f:
data = f[dname]
print(data[:5])
return data
#main code
file_name = ['/dbfs/mnt/file1.hdf', '/dbfs/mnt/file2.hdf']
rdd = spark.sparkContext.parallelize(file_name)
result = rdd.map(lambda x: create_hdf_file(x)).collect()
print(result)
#----------------to read files from the dbfs/mnt after copying files from worker
#Test code
read_hdf_file('/dbfs/mnt/file1.hdf', dname = 'default') # works as 'default' existed in original file
read_hdf_file('/dbfs/mnt/file1.hdf', dname = 'test_dataset_44') # failed,
#in above line, 'test_dataset_44' is not being found on dbfs/mnt/file1.hdf