cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Transforming/Saving Python Class Instances to Delta Rows

wim_schmitz_per
New Contributor II

I'm trying to reuse a Python Package to do a very complex series of parsing binary files into workable data in Delta Format.

I have made the first part (binary file parsing) work with a UDF:

asffileparser = F.udf(File()._parseBytes,AsfFileDelta.getSchema())
# _parseBytes() is a method that transforms the binary content into a dictionary
df.withColumn("asffileparsed",asffileparser("filename","content"))

By explicitly defining the full schema of the resulting dictionary.

However the second part (FFT transform) uses a separate processor that populates Pandas Dataframes. A similar method as above:

spectrumparser = F.udf(lambda asffile_dict : vars(Spectrum(assfile_dict)),SpectrumDelta.getSchema())
# The Spectrum() init method generates & populates several pandas dataframes
df.withColumn("spectrum",spectrumparser(F.struct("*")))

bumps into a Spark error as soon as the first Pandas Dataframe is instantiated:

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.

Is the only solution really to rewrite the entire package natively in pyspark?

See also my StackOverflow Question:

https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows

2 REPLIES 2

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi, did you try to follow, "Fix it by registering a custom IObjectConstructor for this class."?

Also, could you please provide us the full error?

Hi debayan! Thanks for getting back to me. How would I register this Constructor?

Below the full stderr thrown by spark on the following command

spectrumparser = F.udf(lambda inputDict : vars(Spectrum(inputDict["filename"],inputDict["path"],dict_=inputDict)),SpectrumDelta.getSchema())
 
spectrumStream = (asffiles
                  .withColumn("spectrum",spectrumparser(F.struct("*")))
                  .withColumn("batch_number",F.regexp_extract("filename",r'([^_]+)',1))
                  .withColumn("spec_package_version",F.lit(__version__))
                  .select("path","filename","batch_number","spec_package_version","spectrum.*")
                  .display()
                 )
23/01/17 08:50:07 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 21)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:110)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
	at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Spark retries 4 times, then the command fails with the same error message.

The underlying Python class "Spectrum" initializes a subclass "SpectrumProcessor" which prepares 5 pandas Dataframes.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.