cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Cannot display DataFrame when I filter by length

cralle
New Contributor II

I have a DataFrame that I have created based on a couple of datasets and multiple operations. The DataFrame has multiple columns, one of which is a array of strings. But when I take the DataFrame and try to filter based upon the size of this array column, and execute the command nothing happens, I get no Spark Jobs, no Stages and in the Ganglia cluster Report there is no computation happening.

All I see in my cell is "Running Command". I am sure it happens because of the filter, because I can display and do a count on the DataFrame, but as soon as I do a filter Databricks gets stuck. I also know that it is not the size function that causes it, because I can make a new colum with the size of the array and I can display and count fine after adding.

So I have a DataFrame I call df_merged_mapped. This is the DataFrame that I want to filter. So I can do this:

print(df_merged_mapped.count())
> 50414

I can also do:

print(df_merged_mapped.withColumn("len", F.size("new_topics")).count())
> 50414

But if I do this, then nothing will happen:

print(df_merged_mapped.withColumn("len", F.size("new_topics")).filter("len > 0").count())

It will just say "Running command...", no Stages or Jobs will ever appear, and the cluster just terminates after the idle period (30 minutes) triggers.

image 

And this is how Gangle cluster Report looks like (I run single node):

image 

I have also tried the following, and each of them stalls and never completes:

df_merged_mapped.createOrReplaceTempView("tmp")
df_new = spark.sql("SELECT *, size(new_topics) AS len FROM tmp WHERE size(new_topics) > 0")
print(df_merged_mapped.filter("size(new_topics) > 0").count())

Does anybody have any ideas why this is happening, and what I can do to solve the problem?

EDIT:

Run on cluster using DBR 10.4 LTS

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

don't see why it would generate an error as it works fine.

Perhaps, as a test, serialize the dataframe (write it and read again) and then try the filter + count.

If that works, it is probably your query plan which might be too complex, or the AQE which goes crazy.

View solution in original post

7 REPLIES 7

-werners-
Esteemed Contributor III

strange, works fine here. what version of databricks are you on?

What you could do to identify the issue is to output the query plan (.explain).

And also creating a new df for each transformation could help. Like that you can check step by step where things go wrong.

cralle
New Contributor II

DBR 10.4 LTS.

I run a lot of transformations and operations before I do this filter. But since I am able to display/count on the DataFrame before the filter, I would assume that it is because of the filter that the error is caused.

-werners-
Esteemed Contributor III

don't see why it would generate an error as it works fine.

Perhaps, as a test, serialize the dataframe (write it and read again) and then try the filter + count.

If that works, it is probably your query plan which might be too complex, or the AQE which goes crazy.

Hubert-Dudek
Esteemed Contributor III

If the query plan is so complex, please do some checkpointing to break the plan into two.

Please also debug by .show() or display() dataframe one step before running .filter("len > 0") in notebook.

cralle
New Contributor II

What do you mean by checkpointing?

Also I have used display as I described in the question, I can do display/count directly before the filter and it works fine, but if I do it directly after the filter, then Spark just gets stuck and does nothing.

cralle
New Contributor II

@Werner Stinckens​ It worked by writing it and then reading it again. So I guess your theory of the query plan being too complex is true.

-werners-
Esteemed Contributor III

checkpointing is writing an intermediate dataframe to disk. Like that the whole logic leading to that DF can be forgotten.

Basically it is the same as writing to parquet and reading it.

There are some technical differences though, but here you can find more info (and on other sites too).

So applying a checkpoint somewhere in your code creates 2 or more smaller query plans than one huge one.

Could be a solution, or figuring out how to make the query simpler...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group