cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Using large dataframe in-memory (data not allowed to be "at rest") results in driver crash and/or out of memory

James_209101
New Contributor II

I'm having trouble working on Databricks with data that we are not allowed to save off or persist in any way. The data comes from an API (which returns a JSON response). We have a scala package on our cluster that makes the queries (almost 6k queries), saves them to a dataframe, then explodes that dataframe into a new dataframe that we can use to get the info we need. However, all the data gets collected to the driver node. Then, when we try to run comparisons/validations/spark code using the resulting dataframe, it won't distribute the work (tries to do everything on the driver) and takes up all the driver's JVM Heap memory.

The two errors we get are "OutOfMemoryError: Java Heap Space" or "The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached."

We've got a 64GB driver (with 5 32GB nodes) and have increased the max JVM Heap Memory to 25GB. But because it won't distribute the work to the nodes, the driver keeps crashing with the OOM errors.

Things run fine when we do save a parquet of this data and run our spark code/comparisons against the parquet (distributes the work fine), but, per an agreement with the data owners, we are only allowed to save data like this for development purposes (currently not a viable option for production).

Things we've tried:

-Removing all displays/prints/logs

-Caching the dataframe (and prior/subsequent dataframes to which it is connected) with .cache()/.count() and spark.sql("CACHE TABLE <tablename>")

-Increasing the size of the driver node to 64GB

-Increasing the JVM heap size to 25GB

-Removing unneeded columns from the dataframe

-Using .repartition(300) on the dataframe

Any help is greatly appreciated.

2 REPLIES 2

Hubert-Dudek
Esteemed Contributor III

Please check your code for .collect() or similar functions. Collect is the most popular reason for OOM errors. As a next step, please analyze Spark UI - look for data spills. It is when data is written from memory to drive and back as it is too large to fit in memory.

I copy-paste below the list of my optimization techniques. Maybe one of the below will help:

  • Look for data skews; some partitions can be huge, some small because of incorrect partitioning. You can use Spark UI to do that but also debug your code a bit (get getNumPartitions()), especially SQL can divide it unequally to partitions (there are settings in connector lowerBound, etc.). You could try to have the number of partitions as workers' cores multiply by X (so they will be processed step by step in the queue),
  • Check for data spills in Spark UI as they mean writing shuffle partitions from RAM to disks and back. (25th, 50th, and 75th percentile should be similar). Increase shuffle partitions if they have to be frequently written to disk.
  • Increase shuffle size spark.SQL.shuffle.partitions default is 200 try bigger, you should calculate it as data size divided by the size of the partition,
  • Increase the size of the driver to be two times bigger than the executor (but to get the optimal size, please analyze load - in databricks on cluster tab look to Metrics there is Ganglia or even better integrate datadog with cluster),
  • Check wide transformations, ones that need to shuffle data between partitions, group them to do one shuffle only,
  • If you need to filter data, if possible, do it after reading from SQL so it will be predicative push so it will add where in SQL query,
  • Make sure that everything runs in a distributed way, specially UDF. It would help if you used vectorized pandas udfs so that they will run on executors. Even pandas UDFs can be slow. You can try instead of registering the SQL function in metastore and use it in your SQL queries.
  • Don't use collect etc., as they are not running in the distributed way but instead are loading everything to memory object on the driver (it is like executing notebook locally)
  • Regarding infrastructure, use more workers and check that your ADLS is connected via a private link.
  • You can also use premium ADLS which is faster.
  • Sometimes I process big data as a stream as it is easier with big data sets. In that scenario, you would need Kafka (confluent cloud is excellent) between SQL and Databricks. If you use the stream for one-time processing, please use AvailableNow instead of Once.
  • When you read data, remember that if you read everything from a big unpartitioned file (around 200 MB each) will be faster. So for a delta, you can OPTIMIZE before reading. Of course, if you read, for example, last x days partitioning per day will be faster, but running OPTIMIZE can help anyway.
  • Regarding writing, the best is that you get as many files as cores/partitions by default, so every core in parallel is working on writing one file. Later you can merge them by mentioning OPTIMIZE. Please check that every file is similar in size. If not, it seems that you have a skew in partitions. In a huge dataset, sometimes it is good to salt number and then partition by that number to make than equal (you can get the number of cores from SparkContext.DefaultParallelism property)

Anonymous
Not applicable

Hi @James Held​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group