cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark tasks too slow and not doing parellel processing

sanjay
Valued Contributor II

Hi,

I have spark job which is processing large data set, its taking too long to process the data. In Spark UI, I can see its running 1 tasks out of 9 tasks. Not sure how to run this in parellel. I have already mentioned auto scaling and providing upto 8 instances.

Attached image of spark UI.

Please suggest how to debug this and fix the performance issue.

12 REPLIES 12

-werners-
Esteemed Contributor III

from the screenshot you provided it seems you are doing a merge statement.

Depending on the partitioning of your delta table this can be done in parallel or not.

f.e. if all your incoming data resides in one huge partition, spark will have to completely write this huge partition which can take a long time.

Can you share some code?

pvignesh92
Honored Contributor

Hi @Sanjay Jain​ Did you get a chance to see how many partitions are available in your dataframe before performing the merge operation and how the data is distributed between them? This will help you to see if you have any skewed data. Also you might need to look at the key on which you are doing Merging to check the skewing on any specific set of values.

Below code will help you get the records per partition

from pyspark.sql.functions  import spark_partition_id
rawDf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

sanjay
Valued Contributor II

My partition is based on date, here is partition information for around 70k records.

partitionId|count|

+-----------+-----+

| 0|14557|

| 1|25455|

| 2|20330|

| 3| 1776|

| 4| 2868|

| 5| 1251|

| 6| 1145|

| 7| 127

-werners-
Esteemed Contributor III

that is pretty skewed, however: that does not explain why there is no parallelism.

The only reasons I see is that either:

-the merge only hits one partition

-you apply a coalesce(1) or repartition(1) somewhere

sanjay
Valued Contributor II

As there are 8 partition and this is the same data I need to merge.

How to check how many partitions are used by merge. It should use all 8 partition.

No, I hav't used coalesce or repartition.

Is it possible to connect live, I can show you the code.

-werners-
Esteemed Contributor III

in the history of the delta table you can see how many files have been rewritten (in the operation metrics column).

There are statistics like numtargetfilesadded and numtargetfilesremoved etc.

The fact that your source dataframe (so the incoming data) has 8 partitions, does not mean that the delta lake table also will update 8 partitions.

sanjay
Valued Contributor II

Delta table has same columns used for source table and should have 8 partitions,

-werners-
Esteemed Contributor III

the number of partitions in the delta table is not relevant, what is relevant is how many partitions or files are affected by the merge.

That can be dispayed in the delta history.

databricks can also apply optimizations while writing, so it is possible that it decides to write a single file instead of 8. writing will be worse but reading will be faster.

sanjay
Valued Contributor II

Any suggestion to improve the performance. Is there any parameter configuration to optimize this. Any document on how to debug in Spark UI

-werners-
Esteemed Contributor III

there are several methods:

you can disable optimizations (see the databricks delta lake performance optimization help files) but I would advise against that.

Databricks default settings of the most recent runtimes are pretty optimized IMO. You can write fast using 80 cpus (so 80 partitions) but that will have a negative performance impact when reading this data.

Semantic Partitioning of the delta table is certainly a good idea (if not already done). And there is also Z-ORDER.

There is no simple answer to this.

If your merge in the end will work in parallel, you also have to take the data skew into account.

Debugging is really hard if almost impossible in spark due to the parallel nature of the application.

Anonymous
Not applicable

Hi @Sanjay Jain​ 

Hope everything is going great.

Just wanted to check in if you were able to resolve your issue. If yes, would you be happy to mark an answer as best so that other members can find the solution more quickly? If not, please tell us so we can help you. 

Cheers!

sanjay
Valued Contributor II

Hi Vidula,

I am not able to find right solution to this problem. Appreciate if you can provide any help.

Regards,

Sanjay

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.