cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Joining a big amount of data causes "Out of disk space error", how to ingest?

Erik_L
Contributor II

What I am trying to do

df = None
 
# For all of the IDs that are valid
for id in ids:
  # Get the parts of the data from different sources
  df_1 = spark.read.parquet(url_for_id)
  df_2 = spark.read.parquet(url_for_id)
  ...
 
  # Join together the parts based on their common columns like time and id
  df_join = df_1.join(df_2, on=idx_columns, how=inner).join()....withColumn('id', lit(id))
 
  # Join all the IDs into one big table
  if df == None:
    df = df_join
  else:
    df.union(df_join)
 
# Now write it out to a new table to work with
df.writeTo(table).createOrReplace()

Each ID has 100's of GB of data broken into 0.5 - 1.5 GB chunks. The total size of the union table would be betwen 7 and 10 TBs.

What happened

Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: INTERNAL: /local_disk0/spark-.../executor-.../dbio_cache_root_.../unified_cache-.../.../.../...: No space left on device

It seems like the union isn't clever, because when I execute this on a per ID level, it works fine.

What I need help on

What is the best way to handle these kinds of big data joins? How can I read and write the data effectively?

For example:

  • Should I join each table and just immediately run `.writeTo(table).append()`? Will Spark resolve the fact that I'm doing this 10's of times at the same time?
  • Can checkpointing help with this process?
  • Is there a better way that union?
  • The hard part is that the ID is in the path of the objects, so I'm reading each path and adding the lit(). I've tried using the file path and doing regex, but that runs slower because it does it per-row rather than per-id. Hence, using streaming probably won't work.
  • The data has timestamps, but Spark isn't keeping the data locally clustered. Should I ignore Databricks' cal time clustering optimizations a and just force some of the options? Z-indexing would take forever on this scale of data and the data is already locally clustered, so I want to just keep it that way during ingestion.

Help, please.

2 REPLIES 2

Anonymous
Not applicable

@Erik Louie​ :

There are several strategies that you can use to handle large joins like this in Spark:

  1. Use a broadcast join: If one of your dataframes is relatively small (less than 10-20 GB), you can use a broadcast join to avoid shuffling data. A broadcast join can be used when one dataframe is much smaller than the other, and Spark can fit the entire smaller dataframe into memory on each node. You can use the broadcast() function to explicitly broadcast a dataframe.
  2. Increase the shuffle partition size: When you perform a join in Spark, the data is shuffled between nodes. By default, Spark uses 200 shuffle partitions, but you can increase this number to reduce the amount of data shuffled in each partition. You can set the spark.sql.shuffle.partitions configuration parameter to a larger value (e.g., 1000) to increase the number of shuffle partitions.
  3. Use a bucketed join: If your data is already partitioned or clustered by a key that you will use for joining, you can use a bucketed join to avoid shuffling data. A bucketed join requires that both dataframes are bucketed by the same key and sorted by the same key, so you may need to reorganize your data accordingly. You can use the bucketBy() and sortBy() functions to bucket and sort a dataframe.
  4. Use a checkpointing mechanism: If you're running out of memory during the join, you can use a checkpointing mechanism to write intermediate results to disk. This can help prevent out-of-memory errors and improve performance for large dataframes. You can use the checkpoint() function to checkpoint a dataframe, and you can set the spark.sql.autoBroadcastJoinThreshold configuration parameter to a larger value to trigger automatic checkpointing for large dataframes.

In terms of reading and writing the data, you may want to consider using a distributed file system like HDFS or S3 to store your data, which can help with scalability and fault tolerance. You can use the

read() and write() functions in Spark to read and write data from these file systems.

Overall, the best strategy for handling large joins will depend on the specifics of your data and your use case. You may need to experiment with different approaches and configurations to find the optimal solution for your needs.

Thank you for the thorough response -- I'll look into some of these. For extra context, here are some constraints on the data:

All of the tables should be _mostly_ the same size and same keys as they are generated from processes that generate time series data (that is, the data source is a series of parquet tables in S3 that are sorted by time and each timestamp will _usually_ exist in all N tables, but not always). Before ingesting, these tables are sorted by timestamp, but Spark shuffles on ingesting because I run a cast from the unix timestamp to an ISO timestamp (and I'd really like to avoid this, if possible, but I'm not sure how aside from the `bucketBy` and `sortBy` you suggested earlier).

Unfortunately, I don't have control of the way these tables are generated right now, but in the future I intend these tables to be generated pre-joined rather than separated, since there's no value in having them separated.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group