cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Transforming/Saving Python Class Instances to Delta Rows

wim_schmitz_per
New Contributor II

I'm trying to reuse a Python Package to do a very complex series of parsing binary files into workable data in Delta Format.

I have made the first part (binary file parsing) work with a UDF:

asffileparser = F.udf(File()._parseBytes,AsfFileDelta.getSchema())
# _parseBytes() is a method that transforms the binary content into a dictionary
df.withColumn("asffileparsed",asffileparser("filename","content"))

By explicitly defining the full schema of the resulting dictionary.

However the second part (FFT transform) uses a separate processor that populates Pandas Dataframes. A similar method as above:

spectrumparser = F.udf(lambda asffile_dict : vars(Spectrum(assfile_dict)),SpectrumDelta.getSchema())
# The Spectrum() init method generates & populates several pandas dataframes
df.withColumn("spectrum",spectrumparser(F.struct("*")))

bumps into a Spark error as soon as the first Pandas Dataframe is instantiated:

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.

Is the only solution really to rewrite the entire package natively in pyspark?

See also my StackOverflow Question:

https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows

2 REPLIES 2

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi, did you try to follow, "Fix it by registering a custom IObjectConstructor for this class."?

Also, could you please provide us the full error?

Hi debayan! Thanks for getting back to me. How would I register this Constructor?

Below the full stderr thrown by spark on the following command

spectrumparser = F.udf(lambda inputDict : vars(Spectrum(inputDict["filename"],inputDict["path"],dict_=inputDict)),SpectrumDelta.getSchema())
 
spectrumStream = (asffiles
                  .withColumn("spectrum",spectrumparser(F.struct("*")))
                  .withColumn("batch_number",F.regexp_extract("filename",r'([^_]+)',1))
                  .withColumn("spec_package_version",F.lit(__version__))
                  .select("path","filename","batch_number","spec_package_version","spectrum.*")
                  .display()
                 )
23/01/17 08:50:07 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 21)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:110)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
	at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Spark retries 4 times, then the command fails with the same error message.

The underlying Python class "Spectrum" initializes a subclass "SpectrumProcessor" which prepares 5 pandas Dataframes.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!