Transforming/Saving Python Class Instances to Delta Rows
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-03-2023 09:53 AM
I'm trying to reuse a Python Package to do a very complex series of parsing binary files into workable data in Delta Format.
I have made the first part (binary file parsing) work with a UDF:
asffileparser = F.udf(File()._parseBytes,AsfFileDelta.getSchema())
# _parseBytes() is a method that transforms the binary content into a dictionary
df.withColumn("asffileparsed",asffileparser("filename","content"))
By explicitly defining the full schema of the resulting dictionary.
However the second part (FFT transform) uses a separate processor that populates Pandas Dataframes. A similar method as above:
spectrumparser = F.udf(lambda asffile_dict : vars(Spectrum(assfile_dict)),SpectrumDelta.getSchema())
# The Spectrum() init method generates & populates several pandas dataframes
df.withColumn("spectrum",spectrumparser(F.struct("*")))
bumps into a Spark error as soon as the first Pandas Dataframe is instantiated:
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
Is the only solution really to rewrite the entire package natively in pyspark?
See also my StackOverflow Question:
https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-05-2023 09:45 PM
Hi, did you try to follow, "Fix it by registering a custom IObjectConstructor for this class."?
Also, could you please provide us the full error?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-17-2023 12:55 AM
Hi debayan! Thanks for getting back to me. How would I register this Constructor?
Below the full stderr thrown by spark on the following command
spectrumparser = F.udf(lambda inputDict : vars(Spectrum(inputDict["filename"],inputDict["path"],dict_=inputDict)),SpectrumDelta.getSchema())
spectrumStream = (asffiles
.withColumn("spectrum",spectrumparser(F.struct("*")))
.withColumn("batch_number",F.regexp_extract("filename",r'([^_]+)',1))
.withColumn("spec_package_version",F.lit(__version__))
.select("path","filename","batch_number","spec_package_version","spectrum.*")
.display()
)
23/01/17 08:50:07 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 21)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:110)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Spark retries 4 times, then the command fails with the same error message.
The underlying Python class "Spectrum" initializes a subclass "SpectrumProcessor" which prepares 5 pandas Dataframes.