cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Why is Delta Lake creating a 238.0TiB shuffle on merge?

JordanYaker
Contributor

I'm frankly at a loss here. I have a task that is consistently performing just awfully. I took some time this morning to try and debug it and the physical plan is showing a 238TiB shuffle:

== Physical Plan ==
AdaptiveSparkPlan (40)
+- == Current Plan ==
   SerializeFromObject (22)
   +- MapPartitions (21)
      +- DeserializeToObject (20)
         +- Project (19)
            +- ObjectHashAggregate (18)
               +- Exchange (17)
                  +- ObjectHashAggregate (16)
                     +- ObjectHashAggregate (15)
                        +- ShuffleQueryStage (14), Statistics(sizeInBytes=238.0 TiB)
                           +- Exchange (13)
                              +- ObjectHashAggregate (12)
                                 +- * Project (11)
                                    +- CartesianProduct Inner (10)
                                       :- * Project (5)
                                       :  +- * Filter (4)
                                       :     +- * Project (3)
                                       :        +- * ColumnarToRow (2)
                                       :           +- Scan parquet  (1)
                                       +- * Project (9)
                                          +- * Project (8)
                                             +- * ColumnarToRow (7)
                                                +- Scan parquet  (6)

I could understand this number if I was working with a lot of data. I'm not. The Cartesian Product in this query produces 125 rows as shown below so it's not my merge logic

imageAdditionally, the output table isn't very big either; it's 15 files with no file larger than 10MB (NOTE: I could definitely do some repartitioning here to have a better setup but that's another story).

I feel like I'm at the end of my wits with this problem. Any ideas would be appreciated.

8 REPLIES 8

Anonymous
Not applicable

So I'm not too sure of the problem, but I'll walk you through my thinking and ideas.

The deserialize/map/serialize is that a case class in Scala?

How big are the two tables you're joining?

JordanYaker
Contributor

@Joseph Kambourakis​ one table is 1.5MB. The other is about 80MB.

Anonymous
Not applicable

Hmm, then it doesn't make sense that it would create much data on a shuffle or in any capacity. What does the shuffle look like in the plan? It should say data written/read in that part.

Not very big.

image 

What's interesting is that this stage ran for 7hrs. And most of that is scheduler delay.

Anonymous
Not applicable

The input size and records looks like what you'd expect from the table sizes and it's not creating 218TB thankfully. That said, I'm not exactly sure what the problem is in that stage, but there is def something going on w/ that length of time.

JordanYaker
Contributor

I'm honestly wondering if it's just not a trick of the logic on the merge at this point.

I tried running a join between the output files and what would be the input to my MERGE statement. I ran an explain on that query and it ends up creating a BroadcastNestedLoopJoin. More times than not, nested loop joins have bedeviled my performance. I'm going to just try splitting the merge in to two separate calls and see if that does the trick for me.

It might just be that the explain on a MERGE doesn't show this because of how merges are executed.

Vartika
Moderator
Moderator

Hi @Jordan Yaker​,

Hope all is well!

Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

It turned out to be the BroadcastNestedLoopJoin. Once I reworked my logic to remove that, the performance cleared up.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group