cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Read proprietary files and transform contents to a table - error resilient process needed

Gerhard
New Contributor III

We do have data stored in HDF5 files in a "proprietary" way. This data needs to be read, converted and transformed before it can be inserted into a delta table.

All of this transformation is done in a custom python function that takes the HDF5 file and returns a list of tuples. The input files are about 50MB in size. The output 

is a table with about 1500 rows and 2000 columns for each file.

This is what I do:

In order to process the files in parallel I create a RDD with the file contents (binary) and the number of partitions equal to the number of files. Then I use rdd.map() to run the custom function on each file. The parallelization forces to use one task on each file. The output is a rdd that then gets converted to a dataframe and inserted into a delta table.

df_files # dataframe with the list of files to be processed
rcount = df_files.count()
df_files.rdd.map(lambda x: x.FQNFile).collect()
rdd_files = spark.read.format("binaryFile").load(list_of_files).repartition(rcount).rdd
 
 
def parse_hdf5(row, pandas_df_with_mapping):
	file = row['path']
	file_name = os.path.basename(file)
	
	# read the hdf file from binary
	bio = io.BytesIO(bytes(row['content']))
	h5 = h5py.File(bio, 'r')
	
	# process the h5 file
	# there are some steps to extract the data 
	# mapp it, transpose it, etc.
		
	data_list = []
	for timestamp in hdf5_file:
		tup = (file_name, timestamp, dict_of_data)
		data_list.append(tup)
 
	return data_list
 
 
rdd2 = rdd_files.map(lambda x: parse_hdf5(x, pandas_df_with_mapping))
sch = spark schema
df = spark.createDataFrame(rdd2, sch)
df2 = df.select(explode(df.value).alias('t'))\
    .select('t.file_name', 't.timestamp', 't.struct_of_data')
 
# further process df2 to a delta table

Problems:

1. Executor timeout

From time to time the whole spark job fails with the error message "Executor heartbeat timed out". This is an error I cannot reproduce. Sometimes the job works, sometimes it fails with this error. When I right away restart the job, it can work with no change.

I have searched the logs (driver, executor) and have not found a single hint that points me in the direction of how to solve this. The Metrics of the cluster do not show any significant issues with the CPU or memory utilization.

Any ideas on how to solve this are welcome!

2. Corrupt HDF5 file stops the whole job

If one of the HDF5 files is corrupt the task fail breaks the whole job and disregards all the correct work from the other tasks.

My main question is: is there a better way to process "proprietary" files with a python function mapped on a RDD like I do?

Is there a way to have an error resilient process that even if one task fails the rest of the job is finalized? Ideally with a hint on which task/file failed.

The only alternative that I see is processing it in a loop on the driver, which is not parallel.

Note:

Two things are to be noted as they are not straight forward.

1. Because of a requirement to protect all data in our databricks workspace we do use credential passthrough. This results in the fact that we cannot directly read the HDF5 file from the python function from a mount.

Instead, I have to read the HDF5 files in a RDD with ยดspark.read.format("binaryFile")ยด and then map the function against it.

2. The list of columns stored in the HDF5 file can vary and therefore I have decided to use a struct in the result table. So, the tuple that is returned from the function is actually containing a key/vale map. But this should not be the problem here.

Thank you in advance!

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group