<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Transforming/Saving Python Class Instances to Delta Rows in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/transforming-saving-python-class-instances-to-delta-rows/m-p/13814#M8419</link>
    <description>&lt;P&gt;Hi, did you try to follow, "Fix it by registering a custom IObjectConstructor for this class."?&lt;/P&gt;&lt;P&gt;Also, could you please provide us the full error?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 06 Jan 2023 05:45:32 GMT</pubDate>
    <dc:creator>Debayan</dc:creator>
    <dc:date>2023-01-06T05:45:32Z</dc:date>
    <item>
      <title>Transforming/Saving Python Class Instances to Delta Rows</title>
      <link>https://community.databricks.com/t5/data-engineering/transforming-saving-python-class-instances-to-delta-rows/m-p/13813#M8418</link>
      <description>&lt;P&gt;I'm trying to reuse a Python Package to do a very complex series of parsing binary files into workable data in Delta Format. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I have made the first part (binary file parsing) work with a UDF:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;asffileparser = F.udf(File()._parseBytes,AsfFileDelta.getSchema())
# _parseBytes() is a method that transforms the binary content into a dictionary
df.withColumn("asffileparsed",asffileparser("filename","content"))&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;By explicitly defining the full schema of the resulting dictionary.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;However the second part (FFT transform) uses a separate processor that populates Pandas Dataframes. A similar method as above:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;spectrumparser = F.udf(lambda asffile_dict : vars(Spectrum(assfile_dict)),SpectrumDelta.getSchema())
# The Spectrum() init method generates &amp;amp; populates several pandas dataframes
df.withColumn("spectrum",spectrumparser(F.struct("*")))&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;bumps into a Spark error as soon as the first Pandas Dataframe is instantiated:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Is the only solution really to rewrite the entire package natively in pyspark?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;See also my StackOverflow Question:&lt;/P&gt;&lt;P&gt;&lt;A href="https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows" alt="https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows" target="_blank"&gt;https://stackoverflow.com/questions/74996188/transforming-python-classes-to-spark-delta-rows&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 03 Jan 2023 17:53:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/transforming-saving-python-class-instances-to-delta-rows/m-p/13813#M8418</guid>
      <dc:creator>wim_schmitz_per</dc:creator>
      <dc:date>2023-01-03T17:53:04Z</dc:date>
    </item>
    <item>
      <title>Re: Transforming/Saving Python Class Instances to Delta Rows</title>
      <link>https://community.databricks.com/t5/data-engineering/transforming-saving-python-class-instances-to-delta-rows/m-p/13815#M8420</link>
      <description>&lt;P&gt;Hi debayan! Thanks for getting back to me. How would I register this Constructor?&lt;/P&gt;&lt;P&gt;Below the full stderr thrown by spark on the following command&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;spectrumparser = F.udf(lambda inputDict : vars(Spectrum(inputDict["filename"],inputDict["path"],dict_=inputDict)),SpectrumDelta.getSchema())
 
spectrumStream = (asffiles
                  .withColumn("spectrum",spectrumparser(F.struct("*")))
                  .withColumn("batch_number",F.regexp_extract("filename",r'([^_]+)',1))
                  .withColumn("spec_package_version",F.lit(__version__))
                  .select("path","filename","batch_number","spec_package_version","spectrum.*")
                  .display()
                 )&lt;/CODE&gt;&lt;/PRE&gt;&lt;PRE&gt;&lt;CODE&gt;23/01/17 08:50:07 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 21)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pandas.core.indexes.base._new_Index). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:109)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:110)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
	at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Spark retries 4 times, then the command fails with the same error message. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The underlying Python class "Spectrum" initializes a subclass "SpectrumProcessor" which prepares 5 pandas Dataframes.&lt;/P&gt;</description>
      <pubDate>Tue, 17 Jan 2023 08:55:30 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/transforming-saving-python-class-instances-to-delta-rows/m-p/13815#M8420</guid>
      <dc:creator>wim_schmitz_per</dc:creator>
      <dc:date>2023-01-17T08:55:30Z</dc:date>
    </item>
    <item>
      <title>Re: Transforming/Saving Python Class Instances to Delta Rows</title>
      <link>https://community.databricks.com/t5/data-engineering/transforming-saving-python-class-instances-to-delta-rows/m-p/13814#M8419</link>
      <description>&lt;P&gt;Hi, did you try to follow, "Fix it by registering a custom IObjectConstructor for this class."?&lt;/P&gt;&lt;P&gt;Also, could you please provide us the full error?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 06 Jan 2023 05:45:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/transforming-saving-python-class-instances-to-delta-rows/m-p/13814#M8419</guid>
      <dc:creator>Debayan</dc:creator>
      <dc:date>2023-01-06T05:45:32Z</dc:date>
    </item>
  </channel>
</rss>

