Hi, I'm trying to process a small dataset (less than 300 Mb) composed by five queries that run with spark. The end result of those queries is parsed using python and merged into a data frame. Then I try to write this to a delta lake table using features:
parsedData.write.format('delta').mode('overwrite').option("mergeSchema", "true").save('/mnt/features/dev_customer_account_info')
This single line of code above always causes a memory spike leading to a crash (60 Gb), regardless of the size of parsedData.
The cluster is configured as:
1 Driver 61 GB Memory, 8 Cores
Runtime 11.3.x-cpu-ml-scala2.12
The error looks like this:
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:163)
at org.apache.spark.sql.catalyst.expressions.StructsToJson.getAndReset$1(jsonExpressions.scala:893)
at org.apache.spark.sql.catalyst.expressions.StructsToJson.$anonfun$converter$5(jsonExpressions.scala:904)
at org.apache.spark.sql.catalyst.expressions.StructsToJson$$Lambda$12421/1187286213.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.StructsToJson.nullSafeEval(jsonExpressions.scala:947)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:671)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:127)
at org.apache.spark.sql.execution.python.EvalPythonExec$$Lambda$12407/1574333163.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.util.GroupedAsArrayIterator.next(GroupedAsArrayIterator.scala:45)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:464)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:55)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$11996/626269711.apply(Unknown Source)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2340)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365)
I've tried to run the queries with paging, to reduce the amount of data that should be saved to the table, probably reduce it to as little as 100 Mb, but this part always consumes all available ram and crashes.
The contents of the data frame are pretty standard. I'm at a loss here about what could be done. I'd really appreciate any comments, thoughts or ideas.
Thank you very much