I'm trying to reuse a Python Package to do a very complex series of parsing binary files into workable data in Delta Format.
I have made the first part (binary file parsing) work with a UDF:
asffileparser = F.udf(File()._parseBytes,AsfFileDelta.getSchema())
# _parseBytes() is a method that transforms the binary content into a dictionary
df.withColumn("asffileparsed",asffileparser("filename","content"))
By explicitly defining the full schema of the resulting dictionary.
However the second part (FFT transform) uses a separate processor that populates Pandas Dataframes. A similar method as above:
spectrumparser = F.udf(lambda asffile_dict : vars(Spectrum(assfile_dict)),SpectrumDelta.getSchema())
# The Spectrum() init method generates & populates several pandas dataframes
df.withColumn("spectrum",spectrumparser(F.struct("*")))
bumps into a Spark error as soon as the first Pandas Dataframe is instantiated:
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
Is the only solution really to rewrite the entire package natively in pyspark?
See also my StackOverflow Question:
https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows